Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync-diff-inspector: skip validation for tables that exist only upstream or downstream and print skipped information in summary and progress #693

Merged
merged 21 commits into from
Jan 30, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 30 additions & 8 deletions sync_diff_inspector/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,12 +297,19 @@ func (df *Diff) StructEqual(ctx context.Context) error {
tableIndex = df.startRange.ChunkRange.Index.TableIndex
}
for ; tableIndex < len(tables); tableIndex++ {
isEqual, isSkip, err := df.compareStruct(ctx, tableIndex)
if err != nil {
return errors.Trace(err)
var isEqual, isSkip bool
isAllExist := tables[tableIndex].NeedSkippedTable
if isAllExist == 0 {
var err error
isEqual, isSkip, err = df.compareStruct(ctx, tableIndex)
if err != nil {
return errors.Trace(err)
}
} else {
isEqual, isSkip = false, true
}
progress.RegisterTable(dbutil.TableName(tables[tableIndex].Schema, tables[tableIndex].Table), !isEqual, isSkip)
df.report.SetTableStructCheckResult(tables[tableIndex].Schema, tables[tableIndex].Table, isEqual, isSkip)
progress.RegisterTable(dbutil.TableName(tables[tableIndex].Schema, tables[tableIndex].Table), !isEqual, isSkip, isAllExist)
df.report.SetTableStructCheckResult(tables[tableIndex].Schema, tables[tableIndex].Table, isEqual, isSkip, isAllExist)
}
return nil
}
Expand Down Expand Up @@ -411,12 +418,28 @@ func (df *Diff) consume(ctx context.Context, rangeInfo *splitter.RangeInfo) bool
node: rangeInfo.ToNode(),
}
defer func() { df.sqlCh <- dml }()
tableDiff := df.downstream.GetTables()[rangeInfo.GetTableIndex()]
schema, table := tableDiff.Schema, tableDiff.Table
id := rangeInfo.ChunkRange.Index
if rangeInfo.ChunkRange.Type == chunk.Empty {
dml.node.State = checkpoints.IgnoreState
// for table only exists upstream
if tableDiff.NeedSkippedTable == -1 {
upCount, _ := dbutil.GetRowCount(ctx, df.upstream.GetDB(), schema, table, "", nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if upstream has multiple mysql sources?

isEqual := false
df.report.SetTableDataCheckResult(schema, table, false, int(upCount), 0, upCount, 0, id)
return isEqual
}
// for table only exists downstream
if tableDiff.NeedSkippedTable == 1 {
downCount, _ := dbutil.GetRowCount(ctx, df.downstream.GetDB(), schema, table, "", nil)
isEqual := false
df.report.SetTableDataCheckResult(schema, table, isEqual, 0, int(downCount), 0, downCount, id)
return isEqual
}
return true
}
tableDiff := df.downstream.GetTables()[rangeInfo.GetTableIndex()]
schema, table := tableDiff.Schema, tableDiff.Table

var state string = checkpoints.SuccessState

isEqual, upCount, downCount, err := df.compareChecksumAndGetCount(ctx, rangeInfo)
Expand Down Expand Up @@ -447,7 +470,6 @@ func (df *Diff) consume(ctx context.Context, rangeInfo *splitter.RangeInfo) bool
isEqual = isDataEqual
}
dml.node.State = state
id := rangeInfo.ChunkRange.Index
df.report.SetTableDataCheckResult(schema, table, isEqual, dml.rowAdd, dml.rowDelete, upCount, downCount, id)
return isEqual
}
Expand Down
6 changes: 3 additions & 3 deletions sync_diff_inspector/progress/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (tpp *TableProgressPrinter) UpdateTotal(name string, total int, stopUpdate
}
}

func (tpp *TableProgressPrinter) RegisterTable(name string, isFailed bool, isDone bool) {
func (tpp *TableProgressPrinter) RegisterTable(name string, isFailed bool, isDone bool, isAllExist int) {
var state table_state_t
if isFailed {
if isDone {
Expand Down Expand Up @@ -410,9 +410,9 @@ func UpdateTotal(name string, total int, stopUpdate bool) {
}
}

func RegisterTable(name string, isFailed bool, isDone bool) {
func RegisterTable(name string, isFailed bool, isDone bool, isAllExist int) {
if progress_ != nil {
progress_.RegisterTable(name, isFailed, isDone)
progress_.RegisterTable(name, isFailed, isDone, isAllExist)
}
}

Expand Down
14 changes: 7 additions & 7 deletions sync_diff_inspector/progress/progress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ import (

func TestProgress(t *testing.T) {
p := NewTableProgressPrinter(4, 0)
p.RegisterTable("1", true, true)
p.RegisterTable("1", true, true, 0)
p.StartTable("1", 50, true)
p.RegisterTable("2", true, false)
p.RegisterTable("2", true, false, 0)
p.StartTable("2", 2, true)
p.Inc("2")
p.RegisterTable("3", false, false)
p.RegisterTable("3", false, false, 0)
p.StartTable("3", 1, false)
p.Inc("2")
p.Inc("3")
Expand All @@ -55,9 +55,9 @@ func TestProgress(t *testing.T) {

func TestTableError(t *testing.T) {
p := NewTableProgressPrinter(4, 0)
p.RegisterTable("1", true, true)
p.RegisterTable("1", true, true, 0)
p.StartTable("1", 50, true)
p.RegisterTable("2", true, true)
p.RegisterTable("2", true, true, 0)
p.StartTable("2", 1, true)
p.Inc("2")
buffer := new(bytes.Buffer)
Expand All @@ -80,9 +80,9 @@ func TestTableError(t *testing.T) {

func TestAllSuccess(t *testing.T) {
Init(2, 0)
RegisterTable("1", false, false)
RegisterTable("1", false, false, 0)
StartTable("1", 1, true)
RegisterTable("2", false, false)
RegisterTable("2", false, false, 0)
StartTable("2", 1, true)
Inc("1")
Inc("2")
Expand Down
34 changes: 20 additions & 14 deletions sync_diff_inspector/report/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,16 @@ type ReportConfig struct {

// TableResult saves the check result for every table.
type TableResult struct {
Schema string `json:"schema"`
Table string `json:"table"`
StructEqual bool `json:"struct-equal"`
DataSkip bool `json:"data-skip"`
DataEqual bool `json:"data-equal"`
MeetError error `json:"-"`
ChunkMap map[string]*ChunkResult `json:"chunk-result"` // `ChunkMap` stores the `ChunkResult` of each chunk of the table
UpCount int64 `json:"up-count"` // `UpCount` is the number of rows in the table from upstream
DownCount int64 `json:"down-count"` // `DownCount` is the number of rows in the table from downstream

Schema string `json:"schema"`
Table string `json:"table"`
StructEqual bool `json:"struct-equal"`
DataSkip bool `json:"data-skip"`
DataEqual bool `json:"data-equal"`
MeetError error `json:"-"`
ChunkMap map[string]*ChunkResult `json:"chunk-result"` // `ChunkMap` stores the `ChunkResult` of each chunk of the table
UpCount int64 `json:"up-count"` // `UpCount` is the number of rows in the table from upstream
DownCount int64 `json:"down-count"` // `DownCount` is the number of rows in the table from downstream
TableSkipped int `json:"table-skipped"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe TableLack to differ from DataSkip

}

// ChunkResult save the necessarily information to provide summary information
Expand Down Expand Up @@ -131,6 +131,11 @@ func (r *Report) getDiffRows() [][]string {
}
diffRow := make([]string, 0)
diffRow = append(diffRow, dbutil.TableName(schema, table))
if result.TableSkipped != 0 {
diffRow = append(diffRow, "skipped")
} else {
diffRow = append(diffRow, "succeed")
}
if !result.StructEqual {
diffRow = append(diffRow, "false")
} else {
Expand All @@ -154,8 +159,8 @@ func (r *Report) CalculateTotalSize(ctx context.Context, db *sql.DB) {
for schema, tableMap := range r.TableResults {
for table := range tableMap {
size, err := utils.GetTableSize(ctx, db, schema, table)

if size == 0 || err != nil {
tableSkipped := r.TableResults[schema][table].TableSkipped
if (size == 0 || err != nil) && tableSkipped == 0 {
log.Warn("fail to get the correct size of table, if you want to get the correct size, please analyze the corresponding tables", zap.String("table", dbutil.TableName(schema, table)), zap.Error(err))
} else {
r.TotalSize += size
Expand Down Expand Up @@ -212,7 +217,7 @@ func (r *Report) CommitSummary() error {
summaryFile.WriteString("The following tables contains inconsistent data\n\n")
tableString := &strings.Builder{}
table := tablewriter.NewWriter(tableString)
table.SetHeader([]string{"Table", "Structure equality", "Data diff rows", "UpCount", "DownCount"})
table.SetHeader([]string{"Table", "RESULT", "Structure equality", "Data diff rows", "UpCount", "DownCount"})
diffRows := r.getDiffRows()
for _, v := range diffRows {
table.Append(v)
Expand Down Expand Up @@ -295,12 +300,13 @@ func (r *Report) Init(tableDiffs []*common.TableDiff, sourceConfig [][]byte, tar
}

// SetTableStructCheckResult sets the struct check result for table.
func (r *Report) SetTableStructCheckResult(schema, table string, equal bool, skip bool) {
func (r *Report) SetTableStructCheckResult(schema, table string, equal bool, skip bool, exist int) {
r.Lock()
defer r.Unlock()
tableResult := r.TableResults[schema][table]
tableResult.StructEqual = equal
tableResult.DataSkip = skip
tableResult.TableSkipped = exist
if !equal && r.Result != Error {
r.Result = Fail
}
Expand Down
64 changes: 43 additions & 21 deletions sync_diff_inspector/report/report_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestReport(t *testing.T) {
report.CalculateTotalSize(ctx, db)

// Test Table Report
report.SetTableStructCheckResult("test", "tbl", true, false)
report.SetTableStructCheckResult("test", "tbl", true, false, 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

report.SetTableDataCheckResult("test", "tbl", true, 100, 200, 222, 222, &chunk.ChunkID{1, 1, 1, 1, 2})
report.SetTableMeetError("test", "tbl", errors.New("eeee"))

Expand All @@ -121,16 +121,16 @@ func TestReport(t *testing.T) {
require.Equal(t, new_report.getSortedTables(), [][]string{{"`atest`.`atbl`", "0", "0"}, {"`ctest`.`atbl`", "0", "0"}, {"`test`.`tbl`", "222", "222"}})
require.Equal(t, new_report.getDiffRows(), [][]string{})

new_report.SetTableStructCheckResult("atest", "atbl", true, false)
new_report.SetTableStructCheckResult("atest", "atbl", true, false, 0)
new_report.SetTableDataCheckResult("atest", "atbl", false, 111, 222, 333, 333, &chunk.ChunkID{1, 1, 1, 1, 2})
require.Equal(t, new_report.getSortedTables(), [][]string{{"`ctest`.`atbl`", "0", "0"}, {"`test`.`tbl`", "222", "222"}})
require.Equal(t, new_report.getDiffRows(), [][]string{{"`atest`.`atbl`", "true", "+111/-222", "333", "333"}})
require.Equal(t, new_report.getDiffRows(), [][]string{{"`atest`.`atbl`", "succeed", "true", "+111/-222", "333", "333"}})

new_report.SetTableStructCheckResult("atest", "atbl", false, false)
new_report.SetTableStructCheckResult("atest", "atbl", false, false, 0)
require.Equal(t, new_report.getSortedTables(), [][]string{{"`ctest`.`atbl`", "0", "0"}, {"`test`.`tbl`", "222", "222"}})
require.Equal(t, new_report.getDiffRows(), [][]string{{"`atest`.`atbl`", "false", "+111/-222", "333", "333"}})
require.Equal(t, new_report.getDiffRows(), [][]string{{"`atest`.`atbl`", "succeed", "false", "+111/-222", "333", "333"}})

new_report.SetTableStructCheckResult("ctest", "atbl", false, true)
new_report.SetTableStructCheckResult("ctest", "atbl", false, true, 0)

buf := new(bytes.Buffer)
new_report.Print(buf)
Expand Down Expand Up @@ -245,7 +245,7 @@ func TestPrint(t *testing.T) {

var buf *bytes.Buffer
// All Pass
report.SetTableStructCheckResult("test", "tbl", true, false)
report.SetTableStructCheckResult("test", "tbl", true, false, 0)
report.SetTableDataCheckResult("test", "tbl", true, 0, 0, 22, 22, &chunk.ChunkID{0, 0, 0, 0, 1})
buf = new(bytes.Buffer)
report.Print(buf)
Expand All @@ -254,7 +254,7 @@ func TestPrint(t *testing.T) {

// Error
report.SetTableMeetError("test", "tbl1", errors.New("123"))
report.SetTableStructCheckResult("test", "tbl1", false, false)
report.SetTableStructCheckResult("test", "tbl1", false, false, 0)
buf = new(bytes.Buffer)
report.Print(buf)
require.Equal(t, buf.String(), "Error in comparison process:\n"+
Expand Down Expand Up @@ -319,17 +319,17 @@ func TestGetSnapshot(t *testing.T) {
}
report.Init(tableDiffs, configsBytes[:2], configsBytes[2])

report.SetTableStructCheckResult("test", "tbl", true, false)
report.SetTableStructCheckResult("test", "tbl", true, false, 0)
report.SetTableDataCheckResult("test", "tbl", false, 100, 100, 200, 300, &chunk.ChunkID{0, 0, 0, 1, 10})
report.SetTableDataCheckResult("test", "tbl", true, 0, 0, 300, 300, &chunk.ChunkID{0, 0, 0, 3, 10})
report.SetTableDataCheckResult("test", "tbl", false, 200, 200, 400, 500, &chunk.ChunkID{0, 0, 0, 3, 10})

report.SetTableStructCheckResult("atest", "tbl", true, false)
report.SetTableStructCheckResult("atest", "tbl", true, false, 0)
report.SetTableDataCheckResult("atest", "tbl", false, 100, 100, 500, 600, &chunk.ChunkID{0, 0, 0, 0, 10})
report.SetTableDataCheckResult("atest", "tbl", true, 0, 0, 600, 600, &chunk.ChunkID{0, 0, 0, 3, 10})
report.SetTableDataCheckResult("atest", "tbl", false, 200, 200, 700, 800, &chunk.ChunkID{0, 0, 0, 3, 10})

report.SetTableStructCheckResult("xtest", "tbl", true, false)
report.SetTableStructCheckResult("xtest", "tbl", true, false, 0)
report.SetTableDataCheckResult("xtest", "tbl", false, 100, 100, 800, 900, &chunk.ChunkID{0, 0, 0, 0, 10})
report.SetTableDataCheckResult("xtest", "tbl", true, 0, 0, 900, 900, &chunk.ChunkID{0, 0, 0, 1, 10})
report.SetTableDataCheckResult("xtest", "tbl", false, 200, 200, 1000, 1100, &chunk.ChunkID{0, 0, 0, 3, 10})
Expand Down Expand Up @@ -390,7 +390,9 @@ func TestCommitSummary(t *testing.T) {
createTableSQL3 := "create table `xtest`.`tbl`(`a` int, `b` varchar(10), `c` float, `d` datetime, primary key(`a`, `b`))"
tableInfo3, err := dbutil.GetTableInfoBySQL(createTableSQL3, parser.New())
require.NoError(t, err)

createTableSQL4 := "create table `xtest`.`tb1`(`a` int, `b` varchar(10), `c` float, `d` datetime, primary key(`a`, `b`))"
tableInfo4, err := dbutil.GetTableInfoBySQL(createTableSQL4, parser.New())
require.NoError(t, err)
tableDiffs := []*common.TableDiff{
{
Schema: "test",
Expand All @@ -412,6 +414,16 @@ func TestCommitSummary(t *testing.T) {
Table: "tbl",
Info: tableInfo3,
Collation: "[123]",
}, {
Schema: "xtest",
Table: "tb1",
Info: tableInfo4,
Collation: "[123]",
}, {
Schema: "xtest",
Table: "tb2",
Info: tableInfo4,
Collation: "[123]",
},
}
configs := []*ReportConfig{
Expand Down Expand Up @@ -441,22 +453,28 @@ func TestCommitSummary(t *testing.T) {
}
report.Init(tableDiffs, configsBytes[:2], configsBytes[2])

report.SetTableStructCheckResult("test", "tbl", true, false)
report.SetTableStructCheckResult("test", "tbl", true, false, 0)
report.SetTableDataCheckResult("test", "tbl", true, 100, 200, 400, 400, &chunk.ChunkID{0, 0, 0, 1, 10})

report.SetTableStructCheckResult("atest", "tbl", true, false)
report.SetTableStructCheckResult("atest", "tbl", true, false, 0)
report.SetTableDataCheckResult("atest", "tbl", false, 100, 200, 500, 600, &chunk.ChunkID{0, 0, 0, 2, 10})

report.SetTableStructCheckResult("xtest", "tbl", false, false)
report.SetTableStructCheckResult("xtest", "tbl", false, false, 0)
report.SetTableDataCheckResult("xtest", "tbl", false, 100, 200, 600, 700, &chunk.ChunkID{0, 0, 0, 3, 10})

report.SetTableStructCheckResult("xtest", "tb1", false, true, 1)
report.SetTableDataCheckResult("xtest", "tb1", false, 0, 200, 0, 200, &chunk.ChunkID{0, 0, 0, 4, 10})

report.SetTableStructCheckResult("xtest", "tb2", false, true, -1)
report.SetTableDataCheckResult("xtest", "tb2", false, 100, 0, 100, 0, &chunk.ChunkID{0, 0, 0, 5, 10})

err = report.CommitSummary()
require.NoError(t, err)
filename := path.Join(outputDir, "summary.txt")
file, err := os.Open(filename)
require.NoError(t, err)

p := make([]byte, 1024)
p := make([]byte, 2048)
file.Read(p)
str := string(p)
require.Contains(t, str, "Summary\n\n\n\n"+
Expand All @@ -480,13 +498,17 @@ func TestCommitSummary(t *testing.T) {
"| `ytest`.`tbl` | 0 | 0 |\n"+
"+---------------+---------+-----------+\n\n\n"+
"The following tables contains inconsistent data\n\n"+
"+---------------+--------------------+----------------+---------+-----------+\n"+
"| TABLE | STRUCTURE EQUALITY | DATA DIFF ROWS | UPCOUNT | DOWNCOUNT |\n"+
"+---------------+--------------------+----------------+---------+-----------+\n")
"+---------------+---------+--------------------+----------------+---------+-----------+\n"+
"| TABLE | RESULT | STRUCTURE EQUALITY | DATA DIFF ROWS | UPCOUNT | DOWNCOUNT |\n"+
"+---------------+---------+--------------------+----------------+---------+-----------+\n")
require.Contains(t, str,
"| `atest`.`tbl` | succeed | true | +100/-200 | 500 | 600 |\n")
require.Contains(t, str,
"| `xtest`.`tbl` | succeed | false | +100/-200 | 600 | 700 |\n")
require.Contains(t, str,
"| `atest`.`tbl` | true | +100/-200 | 500 | 600 |\n")
"| `xtest`.`tb1` | skipped | false | +0/-200 | 0 | 200 |\n")
require.Contains(t, str,
"| `xtest`.`tbl` | false | +100/-200 | 600 | 700 |\n")
"| `xtest`.`tb2` | skipped | false | +100/-0 | 100 | 0 |\n")

file.Close()
err = os.Remove(filename)
Expand Down
2 changes: 1 addition & 1 deletion sync_diff_inspector/source/chunks_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (t *ChunksIterator) produceChunks(ctx context.Context, startRange *splitter
for ; t.nextTableIndex < len(t.TableDiffs); t.nextTableIndex++ {
curTableIndex := t.nextTableIndex
// skip data-check, but still need to send a empty chunk to make checkpoint continuous
if t.TableDiffs[curTableIndex].IgnoreDataCheck {
if t.TableDiffs[curTableIndex].IgnoreDataCheck || t.TableDiffs[curTableIndex].NeedSkippedTable != 0 {
pool.Apply(func() {
table := t.TableDiffs[curTableIndex]
progressID := dbutil.TableName(table.Schema, table.Table)
Expand Down
5 changes: 5 additions & 0 deletions sync_diff_inspector/source/common/table_diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,9 @@ type TableDiff struct {
Collation string `json:"collation"`

ChunkSize int64 `json:"chunk-size"`

// NeedSkippedTable = 1: the table only exists downstream,
// NeedSkippedTable = -1: the table only exists upstream,
// NeedSkippedTable = 0: the table exists both upstream and downstream.
NeedSkippedTable int `json:"-"`
}
Loading