diff --git a/sync_diff_inspector/config/config.go b/sync_diff_inspector/config/config.go index 090b74c76..e3bb74005 100644 --- a/sync_diff_inspector/config/config.go +++ b/sync_diff_inspector/config/config.go @@ -374,6 +374,8 @@ type Config struct { CheckStructOnly bool `toml:"check-struct-only" json:"check-struct-only"` // experimental feature: only check table data without table struct CheckDataOnly bool `toml:"check-data-only" json:"-"` + // skip validation for tables that don't exist upstream or downstream + SkipNonExistingTable bool `toml:"skip-non-existing-table" json:"-"` // DMAddr is dm-master's address, the format should like "http://127.0.0.1:8261" DMAddr string `toml:"dm-addr" json:"dm-addr"` // DMTask string `toml:"dm-task" json:"dm-task"` @@ -411,6 +413,7 @@ func NewConfig() *Config { fs.IntVar(&cfg.CheckThreadCount, "check-thread-count", 4, "how many goroutines are created to check data") fs.BoolVar(&cfg.ExportFixSQL, "export-fix-sql", true, "set true if want to compare rows or set to false will only compare checksum") fs.BoolVar(&cfg.CheckStructOnly, "check-struct-only", false, "ignore check table's data") + fs.BoolVar(&cfg.SkipNonExistingTable, "skip-non-existing-table", false, "skip validation for tables that don't exist upstream or downstream") fs.BoolVar(&cfg.CheckDataOnly, "check-data-only", false, "ignore check table's struct") _ = fs.MarkHidden("check-data-only") diff --git a/sync_diff_inspector/diff.go b/sync_diff_inspector/diff.go index e7c04b651..e5b5e9f12 100644 --- a/sync_diff_inspector/diff.go +++ b/sync_diff_inspector/diff.go @@ -297,12 +297,16 @@ func (df *Diff) StructEqual(ctx context.Context) error { tableIndex = df.startRange.ChunkRange.Index.TableIndex } for ; tableIndex < len(tables); tableIndex++ { - isEqual, isSkip, err := df.compareStruct(ctx, tableIndex) - if err != nil { - return errors.Trace(err) + isEqual, isSkip, isAllTableExist := false, true, tables[tableIndex].TableLack + if common.AllTableExist(isAllTableExist) { + var err error + isEqual, isSkip, err = df.compareStruct(ctx, tableIndex) + if err != nil { + return errors.Trace(err) + } } - progress.RegisterTable(dbutil.TableName(tables[tableIndex].Schema, tables[tableIndex].Table), !isEqual, isSkip) - df.report.SetTableStructCheckResult(tables[tableIndex].Schema, tables[tableIndex].Table, isEqual, isSkip) + progress.RegisterTable(dbutil.TableName(tables[tableIndex].Schema, tables[tableIndex].Table), !isEqual, isSkip, isAllTableExist) + df.report.SetTableStructCheckResult(tables[tableIndex].Schema, tables[tableIndex].Table, isEqual, isSkip, isAllTableExist) } return nil } @@ -411,12 +415,21 @@ func (df *Diff) consume(ctx context.Context, rangeInfo *splitter.RangeInfo) bool node: rangeInfo.ToNode(), } defer func() { df.sqlCh <- dml }() + tableDiff := df.downstream.GetTables()[rangeInfo.GetTableIndex()] + schema, table := tableDiff.Schema, tableDiff.Table + id := rangeInfo.ChunkRange.Index if rangeInfo.ChunkRange.Type == chunk.Empty { dml.node.State = checkpoints.IgnoreState + // for tables that don't exist upstream or downstream + if !common.AllTableExist(tableDiff.TableLack) { + upCount := df.upstream.GetCountForLackTable(ctx, rangeInfo) + downCount := df.downstream.GetCountForLackTable(ctx, rangeInfo) + df.report.SetTableDataCheckResult(schema, table, false, int(upCount), int(downCount), upCount, downCount, id) + return false + } return true } - tableDiff := df.downstream.GetTables()[rangeInfo.GetTableIndex()] - schema, table := tableDiff.Schema, tableDiff.Table + var state string = checkpoints.SuccessState isEqual, upCount, downCount, err := df.compareChecksumAndGetCount(ctx, rangeInfo) @@ -447,7 +460,6 @@ func (df *Diff) consume(ctx context.Context, rangeInfo *splitter.RangeInfo) bool isEqual = isDataEqual } dml.node.State = state - id := rangeInfo.ChunkRange.Index df.report.SetTableDataCheckResult(schema, table, isEqual, dml.rowAdd, dml.rowDelete, upCount, downCount, id) return isEqual } diff --git a/sync_diff_inspector/progress/progress.go b/sync_diff_inspector/progress/progress.go index 69dcc2baa..c8f2176a8 100644 --- a/sync_diff_inspector/progress/progress.go +++ b/sync_diff_inspector/progress/progress.go @@ -20,6 +20,8 @@ import ( "os" "strings" "time" + + "github.com/pingcap/tidb-tools/sync_diff_inspector/source/common" ) type TableProgressPrinter struct { @@ -53,7 +55,9 @@ const ( TABLE_STATE_RESULT_FAIL_STRUCTURE_PASS table_state_t = 0x40 TABLE_STATE_RESULT_DIFFERENT table_state_t = 0x80 TABLE_STATE_HEAD table_state_t = 0xff - TABLE_STATE_RESULT_MASK table_state_t = 0xf0 + TABLE_STATE_RESULT_MASK table_state_t = 0xff0 + TABLE_STATE_NOT_EXSIT_UPSTREAM table_state_t = 0x100 + TABLE_STATE_NOT_EXSIT_DOWNSTREAM table_state_t = 0x200 ) type TableProgress struct { @@ -127,11 +131,18 @@ func (tpp *TableProgressPrinter) UpdateTotal(name string, total int, stopUpdate } } -func (tpp *TableProgressPrinter) RegisterTable(name string, isFailed bool, isDone bool) { +func (tpp *TableProgressPrinter) RegisterTable(name string, isFailed bool, isDone bool, isExist int) { var state table_state_t if isFailed { if isDone { - state = TABLE_STATE_RESULT_FAIL_STRUCTURE_DONE | TABLE_STATE_REGISTER + switch isExist { + case common.UpstreamTableLackFlag: + state = TABLE_STATE_NOT_EXSIT_UPSTREAM | TABLE_STATE_REGISTER + case common.DownstreamTableLackFlag: + state = TABLE_STATE_NOT_EXSIT_DOWNSTREAM | TABLE_STATE_REGISTER + default: + state = TABLE_STATE_RESULT_FAIL_STRUCTURE_DONE | TABLE_STATE_REGISTER + } } else { state = TABLE_STATE_RESULT_FAIL_STRUCTURE_CONTINUE | TABLE_STATE_REGISTER } @@ -181,6 +192,7 @@ func (tpp *TableProgressPrinter) PrintSummary() { tpp.tableNums, ) } else { + SkippedNum := 0 for p := tpp.tableFailList.Front(); p != nil; p = p.Next() { tp := p.Value.(*TableProgress) if tp.state&(TABLE_STATE_RESULT_FAIL_STRUCTURE_DONE|TABLE_STATE_RESULT_FAIL_STRUCTURE_CONTINUE) != 0 { @@ -189,10 +201,18 @@ func (tpp *TableProgressPrinter) PrintSummary() { if tp.state&(TABLE_STATE_RESULT_DIFFERENT) != 0 { fixStr = fmt.Sprintf("%sThe data of `%s` is not equal.\n", fixStr, tp.name) } + if tp.state&(TABLE_STATE_NOT_EXSIT_DOWNSTREAM) != 0 { + fixStr = fmt.Sprintf("%sThe data of `%s` does not exist in downstream database.\n", fixStr, tp.name) + SkippedNum++ + } + if tp.state&(TABLE_STATE_NOT_EXSIT_UPSTREAM) != 0 { + fixStr = fmt.Sprintf("%sThe data of `%s` does not exist in upstream database.\n", fixStr, tp.name) + SkippedNum++ + } } fixStr = fmt.Sprintf( - "%s\nThe rest of the tables are all equal.\nThe patch file has been generated to './output_dir/patch.sql'\nYou can view the comparison details through './output_dir/sync_diff_inspector.log'\n", - fixStr, + "%s\nThe rest of the tables are all equal.\nA total of %d tables have been compared, %d tables finished, %d tables failed, %d tables skipped.\nThe patch file has been generated to './output_dir/patch.sql'\nYou can view the comparison details through './output_dir/sync_diff_inspector.log'\n", + fixStr, tpp.tableNums, tpp.tableNums-tpp.tableFailList.Len(), tpp.tableFailList.Len()-SkippedNum, SkippedNum, ) } @@ -337,6 +357,13 @@ func (tpp *TableProgressPrinter) flush(stateIsChanged bool) { tpp.lines++ tpp.progressTableNums++ tp.state = TABLE_STATE_COMPARING + case TABLE_STATE_NOT_EXSIT_UPSTREAM, TABLE_STATE_NOT_EXSIT_DOWNSTREAM: + dynStr = fmt.Sprintf("%sComparing the table data of `%s` ...skipped\n", dynStr, tp.name) + tpp.tableFailList.PushBack(tp) + preNode := p.Prev() + tpp.tableList.Remove(p) + p = preNode + tpp.finishTableNums++ case TABLE_STATE_RESULT_FAIL_STRUCTURE_DONE: fixStr = fmt.Sprintf("%sComparing the table structure of `%s` ... failure\n", fixStr, tp.name) tpp.tableFailList.PushBack(tp) @@ -410,9 +437,9 @@ func UpdateTotal(name string, total int, stopUpdate bool) { } } -func RegisterTable(name string, isFailed bool, isDone bool) { +func RegisterTable(name string, isFailed bool, isDone bool, isExist int) { if progress_ != nil { - progress_.RegisterTable(name, isFailed, isDone) + progress_.RegisterTable(name, isFailed, isDone, isExist) } } diff --git a/sync_diff_inspector/progress/progress_test.go b/sync_diff_inspector/progress/progress_test.go index 6992483ad..bf25a9f50 100644 --- a/sync_diff_inspector/progress/progress_test.go +++ b/sync_diff_inspector/progress/progress_test.go @@ -19,17 +19,18 @@ import ( "testing" "time" + "github.com/pingcap/tidb-tools/sync_diff_inspector/source/common" "github.com/stretchr/testify/require" ) func TestProgress(t *testing.T) { - p := NewTableProgressPrinter(4, 0) - p.RegisterTable("1", true, true) + p := NewTableProgressPrinter(6, 0) + p.RegisterTable("1", true, true, common.AllTableExistFlag) p.StartTable("1", 50, true) - p.RegisterTable("2", true, false) + p.RegisterTable("2", true, false, common.AllTableExistFlag) p.StartTable("2", 2, true) p.Inc("2") - p.RegisterTable("3", false, false) + p.RegisterTable("3", false, false, common.AllTableExistFlag) p.StartTable("3", 1, false) p.Inc("2") p.Inc("3") @@ -39,6 +40,10 @@ func TestProgress(t *testing.T) { p.FailTable("4") p.Inc("3") p.Inc("4") + p.RegisterTable("5", true, true, common.UpstreamTableLackFlag) + p.StartTable("5", 1, true) + p.RegisterTable("6", true, true, common.DownstreamTableLackFlag) + p.StartTable("6", 1, true) time.Sleep(500 * time.Millisecond) p.Close() buffer := new(bytes.Buffer) @@ -47,18 +52,21 @@ func TestProgress(t *testing.T) { require.Equal( t, buffer.String(), - "\x1b[1A\x1b[J\nSummary:\n\nThe structure of `1` is not equal.\nThe structure of `2` is not equal.\nThe data of `4` is not equal.\n"+ - "\nThe rest of the tables are all equal.\nThe patch file has been generated to './output_dir/patch.sql'\n"+ + "\x1b[1A\x1b[J\nSummary:\n\nThe structure of `1` is not equal.\nThe structure of `2` is not equal.\nThe data of `4` is not equal.\nThe data of `5` does not exist in upstream database.\nThe data of `6` does not exist in downstream database.\n"+ + "\nThe rest of the tables are all equal.\nA total of 6 tables have been compared, 1 tables finished, 3 tables failed, 2 tables skipped.\nThe patch file has been generated to './output_dir/patch.sql'\n"+ "You can view the comparison details through './output_dir/sync_diff_inspector.log'\n\n", ) } func TestTableError(t *testing.T) { p := NewTableProgressPrinter(4, 0) - p.RegisterTable("1", true, true) + p.RegisterTable("1", true, true, common.AllTableExistFlag) p.StartTable("1", 50, true) - p.RegisterTable("2", true, true) + p.RegisterTable("2", true, true, common.AllTableExistFlag) p.StartTable("2", 1, true) + p.RegisterTable("3", true, true, common.DownstreamTableLackFlag) + p.StartTable("3", 1, true) + p.Inc("2") buffer := new(bytes.Buffer) p.SetOutput(buffer) @@ -73,6 +81,9 @@ func TestTableError(t *testing.T) { "\x1b[2A\x1b[JComparing the table structure of `2` ... failure\n"+ "_____________________________________________________________________________\n"+ "Progress [==============================>------------------------------] 50% 0/0\n"+ + "\x1b[2A\x1b[JComparing the table data of `3` ...skipped\n"+ + "_____________________________________________________________________________\n"+ + "Progress [=============================================>---------------] 75% 0/1\n"+ "\x1b[1A\x1b[J\nError in comparison process:\n[aaa]\n\n"+ "You can view the comparison details through './output_dir/sync_diff_inspector.log'\n", ) @@ -80,9 +91,9 @@ func TestTableError(t *testing.T) { func TestAllSuccess(t *testing.T) { Init(2, 0) - RegisterTable("1", false, false) + RegisterTable("1", false, false, common.AllTableExistFlag) StartTable("1", 1, true) - RegisterTable("2", false, false) + RegisterTable("2", false, false, common.AllTableExistFlag) StartTable("2", 1, true) Inc("1") Inc("2") diff --git a/sync_diff_inspector/report/report.go b/sync_diff_inspector/report/report.go index 684e581c1..5e3aa208a 100644 --- a/sync_diff_inspector/report/report.go +++ b/sync_diff_inspector/report/report.go @@ -65,7 +65,7 @@ type TableResult struct { ChunkMap map[string]*ChunkResult `json:"chunk-result"` // `ChunkMap` stores the `ChunkResult` of each chunk of the table UpCount int64 `json:"up-count"` // `UpCount` is the number of rows in the table from upstream DownCount int64 `json:"down-count"` // `DownCount` is the number of rows in the table from downstream - + TableLack int `json:"table-lack"` } // ChunkResult save the necessarily information to provide summary information @@ -80,6 +80,7 @@ type Report struct { Result string `json:"-"` // Result is pass or fail PassNum int32 `json:"-"` // The pass number of tables FailedNum int32 `json:"-"` // The failed number of tables + SkippedNum int32 `json:"-"` // The skipped number of tables TableResults map[string]map[string]*TableResult `json:"table-results"` // TableResult saved the map of `schema` => `table` => `tableResult` StartTime time.Time `json:"start-time"` Duration time.Duration `json:"time-duration"` @@ -131,6 +132,11 @@ func (r *Report) getDiffRows() [][]string { } diffRow := make([]string, 0) diffRow = append(diffRow, dbutil.TableName(schema, table)) + if !common.AllTableExist(result.TableLack) { + diffRow = append(diffRow, "skipped") + } else { + diffRow = append(diffRow, "succeed") + } if !result.StructEqual { diffRow = append(diffRow, "false") } else { @@ -154,7 +160,6 @@ func (r *Report) CalculateTotalSize(ctx context.Context, db *sql.DB) { for schema, tableMap := range r.TableResults { for table := range tableMap { size, err := utils.GetTableSize(ctx, db, schema, table) - if size == 0 || err != nil { log.Warn("fail to get the correct size of table, if you want to get the correct size, please analyze the corresponding tables", zap.String("table", dbutil.TableName(schema, table)), zap.Error(err)) } else { @@ -166,11 +171,13 @@ func (r *Report) CalculateTotalSize(ctx context.Context, db *sql.DB) { // CommitSummary commit summary info func (r *Report) CommitSummary() error { - passNum, failedNum := int32(0), int32(0) + passNum, failedNum, skippedNum := int32(0), int32(0), int32(0) for _, tableMap := range r.TableResults { for _, result := range tableMap { if result.StructEqual && result.DataEqual { passNum++ + } else if !common.AllTableExist(result.TableLack) { + skippedNum++ } else { failedNum++ } @@ -178,6 +185,7 @@ func (r *Report) CommitSummary() error { } r.PassNum = passNum r.FailedNum = failedNum + r.SkippedNum = skippedNum summaryPath := filepath.Join(r.task.OutputDir, "summary.txt") summaryFile, err := os.Create(summaryPath) if err != nil { @@ -208,11 +216,11 @@ func (r *Report) CommitSummary() error { summaryFile.WriteString(tableString.String()) summaryFile.WriteString("\n\n") } - if r.Result == Fail { + if r.Result == Fail || r.SkippedNum != 0 { summaryFile.WriteString("The following tables contains inconsistent data\n\n") tableString := &strings.Builder{} table := tablewriter.NewWriter(tableString) - table.SetHeader([]string{"Table", "Structure equality", "Data diff rows", "UpCount", "DownCount"}) + table.SetHeader([]string{"Table", "Result", "Structure equality", "Data diff rows", "UpCount", "DownCount"}) diffRows := r.getDiffRows() for _, v := range diffRows { table.Append(v) @@ -228,26 +236,35 @@ func (r *Report) CommitSummary() error { func (r *Report) Print(w io.Writer) error { var summary strings.Builder - if r.Result == Pass { - summary.WriteString(fmt.Sprintf("A total of %d table have been compared and all are equal.\n", r.FailedNum+r.PassNum)) + if r.Result == Pass && r.SkippedNum == 0 { + summary.WriteString(fmt.Sprintf("A total of %d table have been compared and all are equal.\n", r.FailedNum+r.PassNum+r.SkippedNum)) summary.WriteString(fmt.Sprintf("You can view the comparision details through '%s/%s'\n", r.task.OutputDir, config.LogFileName)) - } else if r.Result == Fail { + } else if r.Result == Fail || r.SkippedNum != 0 { for schema, tableMap := range r.TableResults { for table, result := range tableMap { if !result.StructEqual { if result.DataSkip { - summary.WriteString(fmt.Sprintf("The structure of %s is not equal, and data-check is skipped\n", dbutil.TableName(schema, table))) + switch result.TableLack { + case common.UpstreamTableLackFlag: + summary.WriteString(fmt.Sprintf("The data of %s does not exist in upstream database\n", dbutil.TableName(schema, table))) + case common.DownstreamTableLackFlag: + summary.WriteString(fmt.Sprintf("The data of %s does not exist in downstream database\n", dbutil.TableName(schema, table))) + default: + summary.WriteString(fmt.Sprintf("The structure of %s is not equal, and data-check is skipped\n", dbutil.TableName(schema, table))) + } } else { summary.WriteString(fmt.Sprintf("The structure of %s is not equal\n", dbutil.TableName(schema, table))) } } - if !result.DataEqual { + if !result.DataEqual && common.AllTableExist(result.TableLack) { summary.WriteString(fmt.Sprintf("The data of %s is not equal\n", dbutil.TableName(schema, table))) } } } summary.WriteString("\n") summary.WriteString("The rest of tables are all equal.\n") + summary.WriteString("\n") + summary.WriteString(fmt.Sprintf("A total of %d tables have been compared, %d tables finished, %d tables failed, %d tables skipped.\n", r.FailedNum+r.PassNum+r.SkippedNum, r.PassNum, r.FailedNum, r.SkippedNum)) summary.WriteString(fmt.Sprintf("The patch file has been generated in \n\t'%s/'\n", r.task.FixDir)) summary.WriteString(fmt.Sprintf("You can view the comparision details through '%s/%s'\n", r.task.OutputDir, config.LogFileName)) } else { @@ -295,13 +312,14 @@ func (r *Report) Init(tableDiffs []*common.TableDiff, sourceConfig [][]byte, tar } // SetTableStructCheckResult sets the struct check result for table. -func (r *Report) SetTableStructCheckResult(schema, table string, equal bool, skip bool) { +func (r *Report) SetTableStructCheckResult(schema, table string, equal bool, skip bool, exist int) { r.Lock() defer r.Unlock() tableResult := r.TableResults[schema][table] tableResult.StructEqual = equal tableResult.DataSkip = skip - if !equal && r.Result != Error { + tableResult.TableLack = exist + if !equal && common.AllTableExist(tableResult.TableLack) && r.Result != Error { r.Result = Fail } } @@ -323,11 +341,11 @@ func (r *Report) SetTableDataCheckResult(schema, table string, equal bool, rowsA } result.ChunkMap[id.ToString()].RowsAdd += rowsAdd result.ChunkMap[id.ToString()].RowsDelete += rowsDelete - if r.Result != Error { + if r.Result != Error && common.AllTableExist(result.TableLack) { r.Result = Fail } } - if !equal && r.Result != Error { + if !equal && common.AllTableExist(result.TableLack) && r.Result != Error { r.Result = Fail } } diff --git a/sync_diff_inspector/report/report_test.go b/sync_diff_inspector/report/report_test.go index 9bfc324ab..9a3802009 100644 --- a/sync_diff_inspector/report/report_test.go +++ b/sync_diff_inspector/report/report_test.go @@ -70,6 +70,12 @@ func TestReport(t *testing.T) { Info: tableInfo2, Collation: "[123]", }, + { + Schema: "dtest", + Table: "atbl", + Info: tableInfo2, + Collation: "[123]", + }, } configs := []*ReportConfig{ { @@ -104,7 +110,7 @@ func TestReport(t *testing.T) { report.CalculateTotalSize(ctx, db) // Test Table Report - report.SetTableStructCheckResult("test", "tbl", true, false) + report.SetTableStructCheckResult("test", "tbl", true, false, common.AllTableExistFlag) report.SetTableDataCheckResult("test", "tbl", true, 100, 200, 222, 222, &chunk.ChunkID{1, 1, 1, 1, 2}) report.SetTableMeetError("test", "tbl", errors.New("eeee")) @@ -118,19 +124,21 @@ func TestReport(t *testing.T) { require.True(t, result.DataEqual) require.True(t, result.StructEqual) - require.Equal(t, new_report.getSortedTables(), [][]string{{"`atest`.`atbl`", "0", "0"}, {"`ctest`.`atbl`", "0", "0"}, {"`test`.`tbl`", "222", "222"}}) + require.Equal(t, new_report.getSortedTables(), [][]string{{"`atest`.`atbl`", "0", "0"}, {"`ctest`.`atbl`", "0", "0"}, {"`dtest`.`atbl`", "0", "0"}, {"`test`.`tbl`", "222", "222"}}) require.Equal(t, new_report.getDiffRows(), [][]string{}) - new_report.SetTableStructCheckResult("atest", "atbl", true, false) + new_report.SetTableStructCheckResult("atest", "atbl", true, false, common.AllTableExistFlag) new_report.SetTableDataCheckResult("atest", "atbl", false, 111, 222, 333, 333, &chunk.ChunkID{1, 1, 1, 1, 2}) - require.Equal(t, new_report.getSortedTables(), [][]string{{"`ctest`.`atbl`", "0", "0"}, {"`test`.`tbl`", "222", "222"}}) - require.Equal(t, new_report.getDiffRows(), [][]string{{"`atest`.`atbl`", "true", "+111/-222", "333", "333"}}) + require.Equal(t, new_report.getSortedTables(), [][]string{{"`ctest`.`atbl`", "0", "0"}, {"`dtest`.`atbl`", "0", "0"}, {"`test`.`tbl`", "222", "222"}}) + require.Equal(t, new_report.getDiffRows(), [][]string{{"`atest`.`atbl`", "succeed", "true", "+111/-222", "333", "333"}}) - new_report.SetTableStructCheckResult("atest", "atbl", false, false) - require.Equal(t, new_report.getSortedTables(), [][]string{{"`ctest`.`atbl`", "0", "0"}, {"`test`.`tbl`", "222", "222"}}) - require.Equal(t, new_report.getDiffRows(), [][]string{{"`atest`.`atbl`", "false", "+111/-222", "333", "333"}}) + new_report.SetTableStructCheckResult("atest", "atbl", false, false, common.AllTableExistFlag) + require.Equal(t, new_report.getSortedTables(), [][]string{{"`ctest`.`atbl`", "0", "0"}, {"`dtest`.`atbl`", "0", "0"}, {"`test`.`tbl`", "222", "222"}}) + require.Equal(t, new_report.getDiffRows(), [][]string{{"`atest`.`atbl`", "succeed", "false", "+111/-222", "333", "333"}}) - new_report.SetTableStructCheckResult("ctest", "atbl", false, true) + new_report.SetTableStructCheckResult("ctest", "atbl", false, true, common.AllTableExistFlag) + + new_report.SetTableStructCheckResult("dtest", "atbl", false, true, common.DownstreamTableLackFlag) buf := new(bytes.Buffer) new_report.Print(buf) @@ -138,8 +146,10 @@ func TestReport(t *testing.T) { require.Contains(t, info, "The structure of `atest`.`atbl` is not equal\n") require.Contains(t, info, "The data of `atest`.`atbl` is not equal\n") require.Contains(t, info, "The structure of `ctest`.`atbl` is not equal, and data-check is skipped\n") + require.Contains(t, info, "The data of `dtest`.`atbl` does not exist in downstream database\n") require.Contains(t, info, "\n"+ - "The rest of tables are all equal.\n"+ + "The rest of tables are all equal.\n\n"+ + "A total of 0 tables have been compared, 0 tables finished, 0 tables failed, 0 tables skipped.\n"+ "The patch file has been generated in \n\t'output_dir/123456/fix-on-tidb1/'\n"+ "You can view the comparision details through 'output_dir/sync_diff.log'\n") } @@ -245,7 +255,7 @@ func TestPrint(t *testing.T) { var buf *bytes.Buffer // All Pass - report.SetTableStructCheckResult("test", "tbl", true, false) + report.SetTableStructCheckResult("test", "tbl", true, false, common.AllTableExistFlag) report.SetTableDataCheckResult("test", "tbl", true, 0, 0, 22, 22, &chunk.ChunkID{0, 0, 0, 0, 1}) buf = new(bytes.Buffer) report.Print(buf) @@ -254,7 +264,7 @@ func TestPrint(t *testing.T) { // Error report.SetTableMeetError("test", "tbl1", errors.New("123")) - report.SetTableStructCheckResult("test", "tbl1", false, false) + report.SetTableStructCheckResult("test", "tbl1", false, false, common.AllTableExistFlag) buf = new(bytes.Buffer) report.Print(buf) require.Equal(t, buf.String(), "Error in comparison process:\n"+ @@ -319,17 +329,17 @@ func TestGetSnapshot(t *testing.T) { } report.Init(tableDiffs, configsBytes[:2], configsBytes[2]) - report.SetTableStructCheckResult("test", "tbl", true, false) + report.SetTableStructCheckResult("test", "tbl", true, false, common.AllTableExistFlag) report.SetTableDataCheckResult("test", "tbl", false, 100, 100, 200, 300, &chunk.ChunkID{0, 0, 0, 1, 10}) report.SetTableDataCheckResult("test", "tbl", true, 0, 0, 300, 300, &chunk.ChunkID{0, 0, 0, 3, 10}) report.SetTableDataCheckResult("test", "tbl", false, 200, 200, 400, 500, &chunk.ChunkID{0, 0, 0, 3, 10}) - report.SetTableStructCheckResult("atest", "tbl", true, false) + report.SetTableStructCheckResult("atest", "tbl", true, false, common.AllTableExistFlag) report.SetTableDataCheckResult("atest", "tbl", false, 100, 100, 500, 600, &chunk.ChunkID{0, 0, 0, 0, 10}) report.SetTableDataCheckResult("atest", "tbl", true, 0, 0, 600, 600, &chunk.ChunkID{0, 0, 0, 3, 10}) report.SetTableDataCheckResult("atest", "tbl", false, 200, 200, 700, 800, &chunk.ChunkID{0, 0, 0, 3, 10}) - report.SetTableStructCheckResult("xtest", "tbl", true, false) + report.SetTableStructCheckResult("xtest", "tbl", true, false, common.AllTableExistFlag) report.SetTableDataCheckResult("xtest", "tbl", false, 100, 100, 800, 900, &chunk.ChunkID{0, 0, 0, 0, 10}) report.SetTableDataCheckResult("xtest", "tbl", true, 0, 0, 900, 900, &chunk.ChunkID{0, 0, 0, 1, 10}) report.SetTableDataCheckResult("xtest", "tbl", false, 200, 200, 1000, 1100, &chunk.ChunkID{0, 0, 0, 3, 10}) @@ -390,7 +400,9 @@ func TestCommitSummary(t *testing.T) { createTableSQL3 := "create table `xtest`.`tbl`(`a` int, `b` varchar(10), `c` float, `d` datetime, primary key(`a`, `b`))" tableInfo3, err := dbutil.GetTableInfoBySQL(createTableSQL3, parser.New()) require.NoError(t, err) - + createTableSQL4 := "create table `xtest`.`tb1`(`a` int, `b` varchar(10), `c` float, `d` datetime, primary key(`a`, `b`))" + tableInfo4, err := dbutil.GetTableInfoBySQL(createTableSQL4, parser.New()) + require.NoError(t, err) tableDiffs := []*common.TableDiff{ { Schema: "test", @@ -412,6 +424,16 @@ func TestCommitSummary(t *testing.T) { Table: "tbl", Info: tableInfo3, Collation: "[123]", + }, { + Schema: "xtest", + Table: "tb1", + Info: tableInfo4, + Collation: "[123]", + }, { + Schema: "xtest", + Table: "tb2", + Info: tableInfo4, + Collation: "[123]", }, } configs := []*ReportConfig{ @@ -441,22 +463,28 @@ func TestCommitSummary(t *testing.T) { } report.Init(tableDiffs, configsBytes[:2], configsBytes[2]) - report.SetTableStructCheckResult("test", "tbl", true, false) + report.SetTableStructCheckResult("test", "tbl", true, false, common.AllTableExistFlag) report.SetTableDataCheckResult("test", "tbl", true, 100, 200, 400, 400, &chunk.ChunkID{0, 0, 0, 1, 10}) - report.SetTableStructCheckResult("atest", "tbl", true, false) + report.SetTableStructCheckResult("atest", "tbl", true, false, common.AllTableExistFlag) report.SetTableDataCheckResult("atest", "tbl", false, 100, 200, 500, 600, &chunk.ChunkID{0, 0, 0, 2, 10}) - report.SetTableStructCheckResult("xtest", "tbl", false, false) + report.SetTableStructCheckResult("xtest", "tbl", false, false, common.AllTableExistFlag) report.SetTableDataCheckResult("xtest", "tbl", false, 100, 200, 600, 700, &chunk.ChunkID{0, 0, 0, 3, 10}) + report.SetTableStructCheckResult("xtest", "tb1", false, true, common.UpstreamTableLackFlag) + report.SetTableDataCheckResult("xtest", "tb1", false, 0, 200, 0, 200, &chunk.ChunkID{0, 0, 0, 4, 10}) + + report.SetTableStructCheckResult("xtest", "tb2", false, true, common.DownstreamTableLackFlag) + report.SetTableDataCheckResult("xtest", "tb2", false, 100, 0, 100, 0, &chunk.ChunkID{0, 0, 0, 5, 10}) + err = report.CommitSummary() require.NoError(t, err) filename := path.Join(outputDir, "summary.txt") file, err := os.Open(filename) require.NoError(t, err) - p := make([]byte, 1024) + p := make([]byte, 2048) file.Read(p) str := string(p) require.Contains(t, str, "Summary\n\n\n\n"+ @@ -480,13 +508,17 @@ func TestCommitSummary(t *testing.T) { "| `ytest`.`tbl` | 0 | 0 |\n"+ "+---------------+---------+-----------+\n\n\n"+ "The following tables contains inconsistent data\n\n"+ - "+---------------+--------------------+----------------+---------+-----------+\n"+ - "| TABLE | STRUCTURE EQUALITY | DATA DIFF ROWS | UPCOUNT | DOWNCOUNT |\n"+ - "+---------------+--------------------+----------------+---------+-----------+\n") + "+---------------+---------+--------------------+----------------+---------+-----------+\n"+ + "| TABLE | RESULT | STRUCTURE EQUALITY | DATA DIFF ROWS | UPCOUNT | DOWNCOUNT |\n"+ + "+---------------+---------+--------------------+----------------+---------+-----------+\n") + require.Contains(t, str, + "| `atest`.`tbl` | succeed | true | +100/-200 | 500 | 600 |\n") + require.Contains(t, str, + "| `xtest`.`tbl` | succeed | false | +100/-200 | 600 | 700 |\n") require.Contains(t, str, - "| `atest`.`tbl` | true | +100/-200 | 500 | 600 |\n") + "| `xtest`.`tb1` | skipped | false | +0/-200 | 0 | 200 |\n") require.Contains(t, str, - "| `xtest`.`tbl` | false | +100/-200 | 600 | 700 |\n") + "| `xtest`.`tb2` | skipped | false | +100/-0 | 100 | 0 |\n") file.Close() err = os.Remove(filename) diff --git a/sync_diff_inspector/source/chunks_iter.go b/sync_diff_inspector/source/chunks_iter.go index f7246eee5..ffdfb4de5 100644 --- a/sync_diff_inspector/source/chunks_iter.go +++ b/sync_diff_inspector/source/chunks_iter.go @@ -103,7 +103,7 @@ func (t *ChunksIterator) produceChunks(ctx context.Context, startRange *splitter for ; t.nextTableIndex < len(t.TableDiffs); t.nextTableIndex++ { curTableIndex := t.nextTableIndex // skip data-check, but still need to send a empty chunk to make checkpoint continuous - if t.TableDiffs[curTableIndex].IgnoreDataCheck { + if t.TableDiffs[curTableIndex].IgnoreDataCheck || !common.AllTableExist(t.TableDiffs[curTableIndex].TableLack) { pool.Apply(func() { table := t.TableDiffs[curTableIndex] progressID := dbutil.TableName(table.Schema, table.Table) diff --git a/sync_diff_inspector/source/common/table_diff.go b/sync_diff_inspector/source/common/table_diff.go index d6d1b9157..8f8588aa2 100644 --- a/sync_diff_inspector/source/common/table_diff.go +++ b/sync_diff_inspector/source/common/table_diff.go @@ -65,4 +65,19 @@ type TableDiff struct { Collation string `json:"collation"` ChunkSize int64 `json:"chunk-size"` + + // TableLack = 1: the table only exists downstream, + // TableLack = -1: the table only exists upstream, + // TableLack = 0: the table exists both upstream and downstream. + TableLack int `json:"-"` +} + +const ( + AllTableExistFlag = 0 + DownstreamTableLackFlag = -1 + UpstreamTableLackFlag = 1 +) + +func AllTableExist(tableLack int) bool { + return tableLack == AllTableExistFlag } diff --git a/sync_diff_inspector/source/mysql_shard.go b/sync_diff_inspector/source/mysql_shard.go index 004b8195e..26789f289 100644 --- a/sync_diff_inspector/source/mysql_shard.go +++ b/sync_diff_inspector/source/mysql_shard.go @@ -69,7 +69,7 @@ func getMatchedSourcesForTable(sourceTablesMap map[string][]*common.TableShardSo log.Fatal("unreachable, source tables map shouldn't be nil.") } matchSources, ok := sourceTablesMap[utils.UniqueID(table.Schema, table.Table)] - if !ok { + if !ok && common.AllTableExist(table.TableLack) { log.Fatal("unreachable, no match source tables in mysql shard source.") } return matchSources @@ -138,6 +138,20 @@ func (s *MySQLSources) GetCountAndCrc32(ctx context.Context, tableRange *splitte } } +func (s *MySQLSources) GetCountForLackTable(ctx context.Context, tableRange *splitter.RangeInfo) int64 { + table := s.tableDiffs[tableRange.GetTableIndex()] + var totalCount int64 + + matchSources := getMatchedSourcesForTable(s.sourceTablesMap, table) + if matchSources != nil { + for _, ms := range matchSources { + count, _ := dbutil.GetRowCount(ctx, ms.DBConn, ms.OriginSchema, ms.OriginTable, "", nil) + totalCount += count + } + } + return totalCount +} + func (s *MySQLSources) GetTables() []*common.TableDiff { return s.tableDiffs } @@ -162,6 +176,10 @@ func (s *MySQLSources) GetRowsIterator(ctx context.Context, tableRange *splitter sourceRows := make(map[int]*sql.Rows) table := s.tableDiffs[tableRange.GetTableIndex()] + // for tables that do not exist upstream or downstream + if !common.AllTableExist(table.TableLack) { + return nil, nil + } matchSources := getMatchedSourcesForTable(s.sourceTablesMap, table) var rowsQuery string @@ -223,6 +241,10 @@ func (s *MySQLSources) GetSnapshot() string { func (s *MySQLSources) GetSourceStructInfo(ctx context.Context, tableIndex int) ([]*model.TableInfo, error) { tableDiff := s.GetTables()[tableIndex] + // for tables that do not exist upstream or downstream + if !common.AllTableExist(tableDiff.TableLack) { + return nil, nil + } tableSources := getMatchedSourcesForTable(s.sourceTablesMap, tableDiff) sourceTableInfos := make([]*model.TableInfo, len(tableSources)) for i, tableSource := range tableSources { @@ -282,7 +304,7 @@ func (ms *MultiSourceRowsIterator) Close() { } } -func NewMySQLSources(ctx context.Context, tableDiffs []*common.TableDiff, ds []*config.DataSource, threadCount int, f tableFilter.Filter) (Source, error) { +func NewMySQLSources(ctx context.Context, tableDiffs []*common.TableDiff, ds []*config.DataSource, threadCount int, f tableFilter.Filter, skipNonExistingTable bool) (Source, error) { sourceTablesMap := make(map[string][]*common.TableShardSource) // we should get the real table name // and real table row query from sourceDB. @@ -320,12 +342,12 @@ func NewMySQLSources(ctx context.Context, tableDiffs []*common.TableDiff, ds []* } } uniqueId := utils.UniqueID(targetSchema, targetTable) - // get all tables from all source db instance - if f.MatchTable(targetSchema, targetTable) { + isMatched := f.MatchTable(targetSchema, targetTable) + if isMatched { // if match the filter, we should respect it and check target has this table later. sourceTablesAfterRoute[uniqueId] = struct{}{} } - if _, ok := targetUniqueTableMap[uniqueId]; !ok { + if _, ok := targetUniqueTableMap[uniqueId]; !ok && !(isMatched && skipNonExistingTable) { continue } maxSourceRouteTableCount[uniqueId]++ @@ -355,7 +377,8 @@ func NewMySQLSources(ctx context.Context, tableDiffs []*common.TableDiff, ds []* } - if err := checkTableMatched(targetUniqueTableMap, sourceTablesAfterRoute); err != nil { + tableDiffs, err := checkTableMatched(tableDiffs, targetUniqueTableMap, sourceTablesAfterRoute, skipNonExistingTable) + if err != nil { return nil, errors.Annotatef(err, "please make sure the filter is correct.") } diff --git a/sync_diff_inspector/source/source.go b/sync_diff_inspector/source/source.go index 3ca96c57f..baa21e85d 100644 --- a/sync_diff_inspector/source/source.go +++ b/sync_diff_inspector/source/source.go @@ -85,6 +85,9 @@ type Source interface { // GetCountAndCrc32 gets the crc32 result and the count from given range. GetCountAndCrc32(context.Context, *splitter.RangeInfo) *ChecksumInfo + // GetCountForLackTable gets the count for tables that don't exist upstream or downstream. + GetCountForLackTable(context.Context, *splitter.RangeInfo) int64 + // GetRowsIterator gets the row data iterator from given range. GetRowsIterator(context.Context, *splitter.RangeInfo) (RowDataIterator, error) @@ -185,10 +188,6 @@ func NewSources(ctx context.Context, cfg *config.Config) (downstream Source, ups } } - if len(tableDiffs) == 0 { - return nil, nil, errors.Errorf("no table need to be compared") - } - // Sort TableDiff is important! // because we compare table one by one. sort.Slice(tableDiffs, func(i, j int) bool { @@ -204,18 +203,21 @@ func NewSources(ctx context.Context, cfg *config.Config) (downstream Source, ups bucketSpliterPool := utils.NewWorkerPool(uint(cfg.CheckThreadCount), "bucketIter") // for mysql_shard, it needs `cfg.CheckThreadCount` + `cfg.SplitThreadCount` at most, because it cannot use bucket. mysqlConnCount := cfg.CheckThreadCount + cfg.SplitThreadCount - upstream, err = buildSourceFromCfg(ctx, tableDiffs, mysqlConnCount, bucketSpliterPool, cfg.Task.TargetCheckTables, cfg.Task.SourceInstances...) + upstream, err = buildSourceFromCfg(ctx, tableDiffs, mysqlConnCount, bucketSpliterPool, cfg.SkipNonExistingTable, cfg.Task.TargetCheckTables, cfg.Task.SourceInstances...) if err != nil { return nil, nil, errors.Annotate(err, "from upstream") } - downstream, err = buildSourceFromCfg(ctx, tableDiffs, mysqlConnCount, bucketSpliterPool, cfg.Task.TargetCheckTables, cfg.Task.TargetInstance) + if len(upstream.GetTables()) == 0 { + return nil, nil, errors.Errorf("no table need to be compared") + } + downstream, err = buildSourceFromCfg(ctx, upstream.GetTables(), mysqlConnCount, bucketSpliterPool, cfg.SkipNonExistingTable, cfg.Task.TargetCheckTables, cfg.Task.TargetInstance) if err != nil { return nil, nil, errors.Annotate(err, "from downstream") } return downstream, upstream, nil } -func buildSourceFromCfg(ctx context.Context, tableDiffs []*common.TableDiff, connCount int, bucketSpliterPool *utils.WorkerPool, f tableFilter.Filter, dbs ...*config.DataSource) (Source, error) { +func buildSourceFromCfg(ctx context.Context, tableDiffs []*common.TableDiff, connCount int, bucketSpliterPool *utils.WorkerPool, skipNonExistingTable bool, f tableFilter.Filter, dbs ...*config.DataSource) (Source, error) { if len(dbs) < 1 { return nil, errors.Errorf("no db config detected") } @@ -226,12 +228,12 @@ func buildSourceFromCfg(ctx context.Context, tableDiffs []*common.TableDiff, con if ok { if len(dbs) == 1 { - return NewTiDBSource(ctx, tableDiffs, dbs[0], bucketSpliterPool, f) + return NewTiDBSource(ctx, tableDiffs, dbs[0], bucketSpliterPool, f, skipNonExistingTable) } else { log.Fatal("Don't support check table in multiple tidb instance, please specify one tidb instance.") } } - return NewMySQLSources(ctx, tableDiffs, dbs, connCount, f) + return NewMySQLSources(ctx, tableDiffs, dbs, connCount, f, skipNonExistingTable) } func getAutoSnapshotPosition(cfg *mysql.Config) (string, string, error) { @@ -375,21 +377,47 @@ type RangeIterator interface { Close() } -func checkTableMatched(targetMap map[string]struct{}, sourceMap map[string]struct{}) error { +func checkTableMatched(tableDiffs []*common.TableDiff, targetMap map[string]struct{}, sourceMap map[string]struct{}, skipNonExistingTable bool) ([]*common.TableDiff, error) { + tableIndexMap := getIndexMapForTable(tableDiffs) // check target exists but source not found for tableDiff := range targetMap { // target table have all passed in tableFilter if _, ok := sourceMap[tableDiff]; !ok { - return errors.Errorf("the source has no table to be compared. target-table is `%s`", tableDiff) + if !skipNonExistingTable { + return tableDiffs, errors.Errorf("the source has no table to be compared. target-table is `%s`", tableDiff) + } + index := tableIndexMap[tableDiff] + if tableDiffs[index].TableLack == 0 { + tableDiffs[index].TableLack = common.UpstreamTableLackFlag + log.Info("the source has no table to be compared", zap.String("target-table", tableDiff)) + } } } // check source exists but target not found for tableDiff := range sourceMap { // need check source table have passd in tableFilter here if _, ok := targetMap[tableDiff]; !ok { - return errors.Errorf("the target has no table to be compared. source-table is `%s`", tableDiff) + if !skipNonExistingTable { + return tableDiffs, errors.Errorf("the target has no table to be compared. source-table is `%s`", tableDiff) + } + slice := strings.Split(strings.Replace(tableDiff, "`", "", -1), ".") + tableDiffs = append(tableDiffs, &common.TableDiff{ + Schema: slice[0], + Table: slice[1], + TableLack: common.DownstreamTableLackFlag, + }) + log.Info("the target has no table to be compared", zap.String("source-table", tableDiff)) } } - log.Info("table match check passed!!") - return nil + log.Info("table match check finished") + return tableDiffs, nil +} + +func getIndexMapForTable(tableDiffs []*common.TableDiff) map[string]int { + tableIndexMap := make(map[string]int) + for i := 0; i < len(tableDiffs); i++ { + tableUniqueID := utils.UniqueID(tableDiffs[i].Schema, tableDiffs[i].Table) + tableIndexMap[tableUniqueID] = i + } + return tableIndexMap } diff --git a/sync_diff_inspector/source/source_test.go b/sync_diff_inspector/source/source_test.go index 1bcc9fb01..0a7a1964d 100644 --- a/sync_diff_inspector/source/source_test.go +++ b/sync_diff_inspector/source/source_test.go @@ -144,7 +144,7 @@ func TestTiDBSource(t *testing.T) { f, err := filter.Parse([]string{"source_test.*"}) require.NoError(t, err) - tidb, err := NewTiDBSource(ctx, tableDiffs, &config.DataSource{Conn: conn}, utils.NewWorkerPool(1, "bucketIter"), f) + tidb, err := NewTiDBSource(ctx, tableDiffs, &config.DataSource{Conn: conn}, utils.NewWorkerPool(1, "bucketIter"), f, false) require.NoError(t, err) caseFn := []struct { @@ -304,7 +304,7 @@ func TestFallbackToRandomIfRangeIsSet(t *testing.T) { Range: "id < 10", // This should prevent using BucketIterator } - tidb, err := NewTiDBSource(ctx, []*common.TableDiff{table1}, &config.DataSource{Conn: conn}, utils.NewWorkerPool(1, "bucketIter"), f) + tidb, err := NewTiDBSource(ctx, []*common.TableDiff{table1}, &config.DataSource{Conn: conn}, utils.NewWorkerPool(1, "bucketIter"), f, false) require.NoError(t, err) analyze := tidb.GetTableAnalyzer() @@ -374,7 +374,7 @@ func TestMysqlShardSources(t *testing.T) { f, err := filter.Parse([]string{"source_test.*"}) require.NoError(t, err) - shard, err := NewMySQLSources(ctx, tableDiffs, cs, 4, f) + shard, err := NewMySQLSources(ctx, tableDiffs, cs, 4, f, false) require.NoError(t, err) for i := 0; i < len(dbs); i++ { @@ -501,7 +501,7 @@ func TestMysqlRouter(t *testing.T) { f, err := filter.Parse([]string{"*.*"}) require.NoError(t, err) - mysql, err := NewMySQLSources(ctx, tableDiffs, []*config.DataSource{ds}, 4, f) + mysql, err := NewMySQLSources(ctx, tableDiffs, []*config.DataSource{ds}, 4, f, false) require.NoError(t, err) // random splitter @@ -610,7 +610,7 @@ func TestTiDBRouter(t *testing.T) { f, err := filter.Parse([]string{"*.*"}) require.NoError(t, err) - tidb, err := NewTiDBSource(ctx, tableDiffs, ds, utils.NewWorkerPool(1, "bucketIter"), f) + tidb, err := NewTiDBSource(ctx, tableDiffs, ds, utils.NewWorkerPool(1, "bucketIter"), f, false) require.NoError(t, err) infoRows := sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow("test_t", "CREATE TABLE `source_test`.`test1` (`a` int, `b` varchar(24), `c` float, primary key(`a`, `b`))") mock.ExpectQuery("SHOW CREATE TABLE.*").WillReturnRows(infoRows) @@ -910,20 +910,39 @@ func TestInitTables(t *testing.T) { } func TestCheckTableMatched(t *testing.T) { + var tableDiffs []*common.TableDiff + tableDiffs = append(tableDiffs, &common.TableDiff{ + Schema: "test", + Table: "t1", + }) + tableDiffs = append(tableDiffs, &common.TableDiff{ + Schema: "test", + Table: "t2", + }) + tmap := make(map[string]struct{}) smap := make(map[string]struct{}) - tmap["1"] = struct{}{} - tmap["2"] = struct{}{} + smap["`test`.`t1`"] = struct{}{} + smap["`test`.`t2`"] = struct{}{} - smap["1"] = struct{}{} - smap["2"] = struct{}{} - require.NoError(t, checkTableMatched(tmap, smap)) + tmap["`test`.`t1`"] = struct{}{} + tmap["`test`.`t2`"] = struct{}{} - delete(smap, "1") - require.Contains(t, checkTableMatched(tmap, smap).Error(), "the source has no table to be compared. target-table") + tables, err := checkTableMatched(tableDiffs, tmap, smap, false) + require.NoError(t, err) - delete(tmap, "1") - smap["1"] = struct{}{} - require.Contains(t, checkTableMatched(tmap, smap).Error(), "the target has no table to be compared. source-table") + smap["`test`.`t3`"] = struct{}{} + tables, err = checkTableMatched(tableDiffs, tmap, smap, false) + require.Contains(t, err.Error(), "the target has no table to be compared. source-table is ``test`.`t3``") + + delete(smap, "`test`.`t2`") + tables, err = checkTableMatched(tableDiffs, tmap, smap, false) + require.Contains(t, err.Error(), "the source has no table to be compared. target-table is ``test`.`t2``") + + tables, err = checkTableMatched(tableDiffs, tmap, smap, true) + require.NoError(t, err) + require.Equal(t, 0, tables[0].TableLack) + require.Equal(t, 1, tables[1].TableLack) + require.Equal(t, -1, tables[2].TableLack) } diff --git a/sync_diff_inspector/source/tidb.go b/sync_diff_inspector/source/tidb.go index 86ebdf826..8c47bf060 100644 --- a/sync_diff_inspector/source/tidb.go +++ b/sync_diff_inspector/source/tidb.go @@ -137,6 +137,16 @@ func (s *TiDBSource) GetCountAndCrc32(ctx context.Context, tableRange *splitter. } } +func (s *TiDBSource) GetCountForLackTable(ctx context.Context, tableRange *splitter.RangeInfo) int64 { + table := s.tableDiffs[tableRange.GetTableIndex()] + matchSource := getMatchSource(s.sourceTableMap, table) + if matchSource != nil { + count, _ := dbutil.GetRowCount(ctx, s.dbConn, matchSource.OriginSchema, matchSource.OriginTable, "", nil) + return count + } + return 0 +} + func (s *TiDBSource) GetTables() []*common.TableDiff { return s.tableDiffs } @@ -194,7 +204,7 @@ func (s *TiDBSource) GetSnapshot() string { return s.snapshot } -func getSourceTableMap(ctx context.Context, tableDiffs []*common.TableDiff, ds *config.DataSource, f tableFilter.Filter) (map[string]*common.TableSource, error) { +func NewTiDBSource(ctx context.Context, tableDiffs []*common.TableDiff, ds *config.DataSource, bucketSpliterPool *utils.WorkerPool, f tableFilter.Filter, skipNonExistingTable bool) (Source, error) { sourceTableMap := make(map[string]*common.TableSource) log.Info("find router for tidb source") // we should get the real table name @@ -236,11 +246,12 @@ func getSourceTableMap(ctx context.Context, tableDiffs []*common.TableDiff, ds * } uniqueId := utils.UniqueID(targetSchema, targetTable) - if f.MatchTable(targetSchema, targetTable) { + isMatched := f.MatchTable(targetSchema, targetTable) + if isMatched { // if match the filter, we should respect it and check target has this table later. sourceTablesAfterRoute[uniqueId] = struct{}{} } - if _, ok := targetUniqueTableMap[uniqueId]; ok { + if _, ok := targetUniqueTableMap[uniqueId]; ok || (isMatched && skipNonExistingTable) { if _, ok := sourceTableMap[uniqueId]; ok { log.Error("TiDB source don't support compare multiple source tables with one downstream table," + " if this happening when diff on same instance is fine. otherwise we are not guarantee this diff result is right") @@ -253,16 +264,9 @@ func getSourceTableMap(ctx context.Context, tableDiffs []*common.TableDiff, ds * } } - if err = checkTableMatched(targetUniqueTableMap, sourceTablesAfterRoute); err != nil { - return nil, errors.Annotatef(err, "please make sure the filter is correct.") - } - return sourceTableMap, nil -} - -func NewTiDBSource(ctx context.Context, tableDiffs []*common.TableDiff, ds *config.DataSource, bucketSpliterPool *utils.WorkerPool, f tableFilter.Filter) (Source, error) { - sourceTableMap, err := getSourceTableMap(ctx, tableDiffs, ds, f) + tableDiffs, err = checkTableMatched(tableDiffs, targetUniqueTableMap, sourceTablesAfterRoute, skipNonExistingTable) if err != nil { - return nil, errors.Trace(err) + return nil, errors.Annotatef(err, "please make sure the filter is correct.") } ts := &TiDBSource{ tableDiffs: tableDiffs, diff --git a/sync_diff_inspector/utils/utils.go b/sync_diff_inspector/utils/utils.go index ed27b0917..68e0ad6de 100644 --- a/sync_diff_inspector/utils/utils.go +++ b/sync_diff_inspector/utils/utils.go @@ -909,7 +909,7 @@ func ResetColumns(tableInfo *model.TableInfo, columns []string) (*model.TableInf return tableInfo, hasTimeStampType } -// UniqueID returns `schema:table` +// UniqueID returns `schema`.`table` func UniqueID(schema string, table string) string { // QuoteSchema quotes a full table name return fmt.Sprintf("`%s`.`%s`", EscapeName(schema), EscapeName(table)) diff --git a/tests/sync_diff_inspector/table_skip/config_base.toml b/tests/sync_diff_inspector/table_skip/config_base.toml new file mode 100644 index 000000000..ba3410073 --- /dev/null +++ b/tests/sync_diff_inspector/table_skip/config_base.toml @@ -0,0 +1,49 @@ +# Diff Configuration. + +######################### Global config ######################### + +# how many goroutines are created to check data +check-thread-count = 4 + +# set false if just want compare data by checksum, will skip select data when checksum is not equal. +# set true if want compare all different rows, will slow down the total compare time. +export-fix-sql = true + +# ignore check table's data +check-struct-only = false + +# skip validation for tables that don't exist upstream or downstream +skip-non-existing-table = true + +######################### Databases config ######################### +[data-sources] +[data-sources.mysql1] + host = "127.0.0.1"#MYSQL_HOST + port = 3306#MYSQL_PORT + user = "root" + password = "" + # remove comment if use tidb's snapshot data + # snapshot = "2016-10-08 16:45:26" + +[data-sources.tidb] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + # remove comment if use tidb's snapshot data + # snapshot = "2016-10-08 16:45:26" + +######################### Task config ######################### +[task] + # 1 fix sql: fix-target-TIDB1.sql + # 2 log: sync-diff.log + # 3 summary: summary.txt + # 4 checkpoint: a dir + output-dir = "/tmp/tidb_tools_test/sync_diff_inspector/output" + + source-instances = ["mysql1"] + + target-instance = "tidb" + + # tables need to check. + target-check-tables = ["skip_test.t*"] \ No newline at end of file diff --git a/tests/sync_diff_inspector/table_skip/config_router.toml b/tests/sync_diff_inspector/table_skip/config_router.toml new file mode 100644 index 000000000..2fa4ededf --- /dev/null +++ b/tests/sync_diff_inspector/table_skip/config_router.toml @@ -0,0 +1,61 @@ +# Diff Configuration. + +######################### Global config ######################### + +# The number of goroutines created to check data. The number of connections between upstream and downstream databases are slightly greater than this value +check-thread-count = 4 + +# If enabled, SQL statements is exported to fix inconsistent tables +export-fix-sql = true + +# Only compares the table structure instead of the data +check-struct-only = false + +# skip validation for tables that don't exist upstream or downstream +skip-non-existing-table = true +######################### Datasource config ######################### +[data-sources.mysql1] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" + + route-rules = ["rule1"] + +[data-sources.mysql2] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" + + route-rules = ["rule2"] + +[data-sources.tidb0] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + +########################### Routes ########################### +[routes.rule1] +schema-pattern = "skip_test" # Matches the schema name of the data source. Supports the wildcards "*" and "?" +table-pattern = "t[1-2]" # Matches the table name of the data source. Supports the wildcards "*" and "?" +target-schema = "skip_test" # The name of the schema in the target database +target-table = "t5" # The name of the target table + +[routes.rule2] +schema-pattern = "skip_test" +table-pattern = "t0" +target-schema = "skip_test" +target-table = "t5" + +######################### Task config ######################### +[task] + output-dir = "/tmp/tidb_tools_test/sync_diff_inspector/output" + + source-instances = ["mysql1", "mysql2"] + + target-instance = "tidb0" + + # The tables of downstream databases to be compared. Each table needs to contain the schema name and the table name, separated by '.' + target-check-tables = ["skip_test.t5"] \ No newline at end of file diff --git a/tests/sync_diff_inspector/table_skip/data.sql b/tests/sync_diff_inspector/table_skip/data.sql new file mode 100644 index 000000000..34a2a7452 --- /dev/null +++ b/tests/sync_diff_inspector/table_skip/data.sql @@ -0,0 +1,5 @@ +create database if not exists skip_test; +create table skip_test.t0 (a int, b int, primary key(a)); +create table skip_test.t1 (a int, b int, primary key(a)); +insert into skip_test.t0 values (1,1); +insert into skip_test.t1 values (2,2); \ No newline at end of file diff --git a/tests/sync_diff_inspector/table_skip/run.sh b/tests/sync_diff_inspector/table_skip/run.sh new file mode 100644 index 000000000..f0ed04cef --- /dev/null +++ b/tests/sync_diff_inspector/table_skip/run.sh @@ -0,0 +1,65 @@ +#!/bin/sh + +set -ex + +cd "$(dirname "$0")" + +OUT_DIR=/tmp/tidb_tools_test/sync_diff_inspector/output +rm -rf $OUT_DIR +mkdir -p $OUT_DIR + +mysql -uroot -h ${MYSQL_HOST} -P ${MYSQL_PORT} < ./data.sql + +# tidb +mysql -uroot -h 127.0.0.1 -P 4000 < ./data.sql + +sed "s/\"127.0.0.1\"#MYSQL_HOST/\"${MYSQL_HOST}\"/g" ./config_base.toml | sed "s/3306#MYSQL_PORT/${MYSQL_PORT}/g" > ./config.toml + +echo "compare tables, check result should be pass" +sync_diff_inspector --config=./config.toml > $OUT_DIR/table_skip_diff.output || true +check_contains "check pass!!!" $OUT_DIR/sync_diff.log +rm -rf $OUT_DIR/* + +echo "make some tables exist only upstream or downstream" +mysql -uroot -h ${MYSQL_HOST} -P ${MYSQL_PORT} -e "create table skip_test.t2 (a int, b int, primary key(a));" +mysql -uroot -h ${MYSQL_HOST} -P ${MYSQL_PORT} -e "insert into skip_test.t2 values (3,3);" +mysql -uroot -h 127.0.0.1 -P 4000 -e "create table skip_test.t3 (a int, b int, primary key(a));" +mysql -uroot -h 127.0.0.1 -P 4000 -e "insert into skip_test.t3 values (1,1);" +sync_diff_inspector --config=./config.toml > $OUT_DIR/table_skip_diff.output || true +check_contains "check pass" $OUT_DIR/sync_diff.log +check_contains "Comparing the table data of \`\`skip_test\`.\`t2\`\` ...skipped" $OUT_DIR/table_skip_diff.output +check_contains "Comparing the table data of \`\`skip_test\`.\`t3\`\` ...skipped" $OUT_DIR/table_skip_diff.output +check_contains "The data of \`skip_test\`.\`t2\` does not exist in downstream database" $OUT_DIR/table_skip_diff.output +check_contains "The data of \`skip_test\`.\`t3\` does not exist in upstream database" $OUT_DIR/table_skip_diff.output +check_contains "| TABLE | RESULT | STRUCTURE EQUALITY | DATA DIFF ROWS | UPCOUNT | DOWNCOUNT |" $OUT_DIR/summary.txt +check_contains "| \`skip_test\`.\`t2\` | skipped | false | +1/-0 | 1 | 0 |" $OUT_DIR/summary.txt +check_contains "| \`skip_test\`.\`t3\` | skipped | false | +0/-1 | 0 | 1 |" $OUT_DIR/summary.txt +rm -rf $OUT_DIR/* + +echo "make some table data not equal" +mysql -uroot -h 127.0.0.1 -P 4000 -e "insert into skip_test.t1 values (4,4);" +sync_diff_inspector --config=./config.toml > $OUT_DIR/table_skip_diff.output || true +check_contains "check failed" $OUT_DIR/sync_diff.log +check_contains "| \`skip_test\`.\`t1\` | succeed | true | +0/-1 | 1 | 2 |" $OUT_DIR/summary.txt +rm -rf $OUT_DIR/* + +echo "make some table structure not equal" +mysql -uroot -h ${MYSQL_HOST} -P ${MYSQL_PORT} -e "create table skip_test.t4 (a int, b int, c int,primary key(a));" +mysql -uroot -h ${MYSQL_HOST} -P ${MYSQL_PORT} -e "insert into skip_test.t4 values (1,1,1);" +mysql -uroot -h 127.0.0.1 -P 4000 -e "create table skip_test.t4 (a int, b int, primary key(a));" +sync_diff_inspector --config=./config.toml > $OUT_DIR/table_skip_diff.output || true +check_contains "check failed" $OUT_DIR/sync_diff.log +check_contains "| \`skip_test\`.\`t4\` | succeed | false | +0/-0 | 0 | 0 |" $OUT_DIR/summary.txt +check_contains "A total of 5 tables have been compared, 1 tables finished, 2 tables failed, 2 tables skipped" $OUT_DIR/table_skip_diff.output +cat $OUT_DIR/summary.txt +rm -rf $OUT_DIR/* + +echo "test router case" +sync_diff_inspector --config=./config_router.toml > $OUT_DIR/table_skip_diff.output || true +check_contains "check pass" $OUT_DIR/sync_diff.log +check_contains "| \`skip_test\`.\`t5\` | skipped | false | +3/-0 | 3 | 0 |" $OUT_DIR/summary.txt +check_contains "The data of \`skip_test\`.\`t5\` does not exist in downstream database" $OUT_DIR/table_skip_diff.output +check_contains "A total of 1 tables have been compared, 0 tables finished, 0 tables failed, 1 tables skipped" $OUT_DIR/table_skip_diff.output +rm -rf $OUT_DIR/* + +echo "table_skip test passed" \ No newline at end of file