Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync-diff-inspector: skip validation for tables that exist only upstream or downstream and print skipped information in summary and progress #693

Merged
merged 21 commits into from
Jan 30, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions sync_diff_inspector/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,8 @@ type Config struct {
CheckStructOnly bool `toml:"check-struct-only" json:"check-struct-only"`
// experimental feature: only check table data without table struct
CheckDataOnly bool `toml:"check-data-only" json:"-"`
// skip validation for tables that don't exist upstream or downstream
SkipNonExistingTable bool `toml:"skip-non-existing-table" json:"-"`
// DMAddr is dm-master's address, the format should like "http://127.0.0.1:8261"
DMAddr string `toml:"dm-addr" json:"dm-addr"`
// DMTask string `toml:"dm-task" json:"dm-task"`
Expand Down Expand Up @@ -411,6 +413,7 @@ func NewConfig() *Config {
fs.IntVar(&cfg.CheckThreadCount, "check-thread-count", 4, "how many goroutines are created to check data")
fs.BoolVar(&cfg.ExportFixSQL, "export-fix-sql", true, "set true if want to compare rows or set to false will only compare checksum")
fs.BoolVar(&cfg.CheckStructOnly, "check-struct-only", false, "ignore check table's data")
fs.BoolVar(&cfg.SkipNonExistingTable, "skip-non-existing-table", false, "skip validation for tables that don't exist upstream or downstream")
fs.BoolVar(&cfg.CheckDataOnly, "check-data-only", false, "ignore check table's struct")

_ = fs.MarkHidden("check-data-only")
Expand Down
31 changes: 23 additions & 8 deletions sync_diff_inspector/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,12 +297,19 @@ func (df *Diff) StructEqual(ctx context.Context) error {
tableIndex = df.startRange.ChunkRange.Index.TableIndex
}
for ; tableIndex < len(tables); tableIndex++ {
isEqual, isSkip, err := df.compareStruct(ctx, tableIndex)
if err != nil {
return errors.Trace(err)
var isEqual, isSkip bool
isAllTableExist := tables[tableIndex].NeedSkippedTable
if source.AllTableExist(tables[tableIndex]) {
var err error
isEqual, isSkip, err = df.compareStruct(ctx, tableIndex)
if err != nil {
return errors.Trace(err)
}
} else {
isEqual, isSkip = false, true
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var isEqual, isSkip bool
isAllTableExist := tables[tableIndex].NeedSkippedTable
if source.AllTableExist(tables[tableIndex]) {
var err error
isEqual, isSkip, err = df.compareStruct(ctx, tableIndex)
if err != nil {
return errors.Trace(err)
}
} else {
isEqual, isSkip = false, true
}
isEqual, isSkip, isAllTableExist := false, true, tables[tableIndex].NeedSkippedTable
if source.AllTableExist(tables[tableIndex]) {
var err error
isEqual, isSkip, err = df.compareStruct(ctx, tableIndex)
if err != nil {
return errors.Trace(err)
}
}

progress.RegisterTable(dbutil.TableName(tables[tableIndex].Schema, tables[tableIndex].Table), !isEqual, isSkip)
df.report.SetTableStructCheckResult(tables[tableIndex].Schema, tables[tableIndex].Table, isEqual, isSkip)
progress.RegisterTable(dbutil.TableName(tables[tableIndex].Schema, tables[tableIndex].Table), !isEqual, isSkip, isAllTableExist)
df.report.SetTableStructCheckResult(tables[tableIndex].Schema, tables[tableIndex].Table, isEqual, isSkip, isAllTableExist)
}
return nil
}
Expand Down Expand Up @@ -411,12 +418,21 @@ func (df *Diff) consume(ctx context.Context, rangeInfo *splitter.RangeInfo) bool
node: rangeInfo.ToNode(),
}
defer func() { df.sqlCh <- dml }()
tableDiff := df.downstream.GetTables()[rangeInfo.GetTableIndex()]
schema, table := tableDiff.Schema, tableDiff.Table
id := rangeInfo.ChunkRange.Index
if rangeInfo.ChunkRange.Type == chunk.Empty {
dml.node.State = checkpoints.IgnoreState
// for tables that don't exist upstream or downstream
if !source.AllTableExist(tableDiff) {
upCount, _ := dbutil.GetRowCount(ctx, df.upstream.GetDB(), schema, table, "", nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if upstream has multiple mysql sources?

downCount, _ := dbutil.GetRowCount(ctx, df.downstream.GetDB(), schema, table, "", nil)
df.report.SetTableDataCheckResult(schema, table, false, int(upCount), int(downCount), upCount, downCount, id)
return false
}
return true
}
tableDiff := df.downstream.GetTables()[rangeInfo.GetTableIndex()]
schema, table := tableDiff.Schema, tableDiff.Table

var state string = checkpoints.SuccessState

isEqual, upCount, downCount, err := df.compareChecksumAndGetCount(ctx, rangeInfo)
Expand Down Expand Up @@ -447,7 +463,6 @@ func (df *Diff) consume(ctx context.Context, rangeInfo *splitter.RangeInfo) bool
isEqual = isDataEqual
}
dml.node.State = state
id := rangeInfo.ChunkRange.Index
df.report.SetTableDataCheckResult(schema, table, isEqual, dml.rowAdd, dml.rowDelete, upCount, downCount, id)
return isEqual
}
Expand Down
40 changes: 33 additions & 7 deletions sync_diff_inspector/progress/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"os"
"strings"
"time"

"github.com/pingcap/tidb-tools/sync_diff_inspector/source/common"
)

type TableProgressPrinter struct {
Expand Down Expand Up @@ -53,7 +55,9 @@ const (
TABLE_STATE_RESULT_FAIL_STRUCTURE_PASS table_state_t = 0x40
TABLE_STATE_RESULT_DIFFERENT table_state_t = 0x80
TABLE_STATE_HEAD table_state_t = 0xff
TABLE_STATE_RESULT_MASK table_state_t = 0xf0
TABLE_STATE_RESULT_MASK table_state_t = 0xff0
TABLE_STATE_NOT_EXSIT_UPSTREAM table_state_t = 0x100
TABLE_STATE_NOT_EXSIT_DOWNSTREAM table_state_t = 0x200
)

type TableProgress struct {
Expand Down Expand Up @@ -127,11 +131,17 @@ func (tpp *TableProgressPrinter) UpdateTotal(name string, total int, stopUpdate
}
}

func (tpp *TableProgressPrinter) RegisterTable(name string, isFailed bool, isDone bool) {
func (tpp *TableProgressPrinter) RegisterTable(name string, isFailed bool, isDone bool, isExist int) {
var state table_state_t
if isFailed {
if isDone {
state = TABLE_STATE_RESULT_FAIL_STRUCTURE_DONE | TABLE_STATE_REGISTER
if isExist == common.UpstreamTableLackFlag {
state = TABLE_STATE_NOT_EXSIT_UPSTREAM | TABLE_STATE_REGISTER
} else if isExist == common.DownstreamTableLackFlag {
state = TABLE_STATE_NOT_EXSIT_DOWNSTREAM | TABLE_STATE_REGISTER
} else {
state = TABLE_STATE_RESULT_FAIL_STRUCTURE_DONE | TABLE_STATE_REGISTER
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if isExist == common.UpstreamTableLackFlag {
state = TABLE_STATE_NOT_EXSIT_UPSTREAM | TABLE_STATE_REGISTER
} else if isExist == common.DownstreamTableLackFlag {
state = TABLE_STATE_NOT_EXSIT_DOWNSTREAM | TABLE_STATE_REGISTER
} else {
state = TABLE_STATE_RESULT_FAIL_STRUCTURE_DONE | TABLE_STATE_REGISTER
}
switch isExist {
case common.UpstreamTableLackFlag:
state = TABLE_STATE_NOT_EXSIT_UPSTREAM | TABLE_STATE_REGISTER
case common.DownstreamTableLackFlag:
state = TABLE_STATE_NOT_EXSIT_DOWNSTREAM | TABLE_STATE_REGISTER
default:
state = TABLE_STATE_RESULT_FAIL_STRUCTURE_DONE | TABLE_STATE_REGISTER
}

} else {
state = TABLE_STATE_RESULT_FAIL_STRUCTURE_CONTINUE | TABLE_STATE_REGISTER
}
Expand Down Expand Up @@ -181,6 +191,7 @@ func (tpp *TableProgressPrinter) PrintSummary() {
tpp.tableNums,
)
} else {
SkippedNum := 0
for p := tpp.tableFailList.Front(); p != nil; p = p.Next() {
tp := p.Value.(*TableProgress)
if tp.state&(TABLE_STATE_RESULT_FAIL_STRUCTURE_DONE|TABLE_STATE_RESULT_FAIL_STRUCTURE_CONTINUE) != 0 {
Expand All @@ -189,10 +200,18 @@ func (tpp *TableProgressPrinter) PrintSummary() {
if tp.state&(TABLE_STATE_RESULT_DIFFERENT) != 0 {
fixStr = fmt.Sprintf("%sThe data of `%s` is not equal.\n", fixStr, tp.name)
}
if tp.state&(TABLE_STATE_NOT_EXSIT_DOWNSTREAM) != 0 {
fixStr = fmt.Sprintf("%sThe data of `%s` does not exist in downstream database.\n", fixStr, tp.name)
SkippedNum++
}
if tp.state&(TABLE_STATE_NOT_EXSIT_UPSTREAM) != 0 {
fixStr = fmt.Sprintf("%sThe data of `%s` does not exist in upstream database.\n", fixStr, tp.name)
SkippedNum++
}
}
fixStr = fmt.Sprintf(
"%s\nThe rest of the tables are all equal.\nThe patch file has been generated to './output_dir/patch.sql'\nYou can view the comparison details through './output_dir/sync_diff_inspector.log'\n",
fixStr,
"%s\nThe rest of the tables are all equal.\nA total of %d tables have been compared, %d tables finished, %d tables failed, %d tables skipped.\nThe patch file has been generated to './output_dir/patch.sql'\nYou can view the comparison details through './output_dir/sync_diff_inspector.log'\n",
fixStr, tpp.tableNums, tpp.tableNums-tpp.tableFailList.Len(), tpp.tableFailList.Len()-SkippedNum, SkippedNum,
)
}

Expand Down Expand Up @@ -337,6 +356,13 @@ func (tpp *TableProgressPrinter) flush(stateIsChanged bool) {
tpp.lines++
tpp.progressTableNums++
tp.state = TABLE_STATE_COMPARING
case TABLE_STATE_NOT_EXSIT_UPSTREAM, TABLE_STATE_NOT_EXSIT_DOWNSTREAM:
dynStr = fmt.Sprintf("%sComparing the table data of `%s` ...skipped\n", dynStr, tp.name)
tpp.tableFailList.PushBack(tp)
preNode := p.Prev()
tpp.tableList.Remove(p)
p = preNode
tpp.finishTableNums++
case TABLE_STATE_RESULT_FAIL_STRUCTURE_DONE:
fixStr = fmt.Sprintf("%sComparing the table structure of `%s` ... failure\n", fixStr, tp.name)
tpp.tableFailList.PushBack(tp)
Expand Down Expand Up @@ -410,9 +436,9 @@ func UpdateTotal(name string, total int, stopUpdate bool) {
}
}

func RegisterTable(name string, isFailed bool, isDone bool) {
func RegisterTable(name string, isFailed bool, isDone bool, isExist int) {
if progress_ != nil {
progress_.RegisterTable(name, isFailed, isDone)
progress_.RegisterTable(name, isFailed, isDone, isExist)
}
}

Expand Down
24 changes: 15 additions & 9 deletions sync_diff_inspector/progress/progress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ import (

func TestProgress(t *testing.T) {
p := NewTableProgressPrinter(4, 0)
p.RegisterTable("1", true, true)
p.RegisterTable("1", true, true, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use constant instead of 1 0.

p.StartTable("1", 50, true)
p.RegisterTable("2", true, false)
p.RegisterTable("2", true, false, 0)
p.StartTable("2", 2, true)
p.Inc("2")
p.RegisterTable("3", false, false)
p.RegisterTable("3", false, false, 0)
p.StartTable("3", 1, false)
p.Inc("2")
p.Inc("3")
Expand All @@ -47,18 +47,21 @@ func TestProgress(t *testing.T) {
require.Equal(
t,
buffer.String(),
"\x1b[1A\x1b[J\nSummary:\n\nThe structure of `1` is not equal.\nThe structure of `2` is not equal.\nThe data of `4` is not equal.\n"+
"\nThe rest of the tables are all equal.\nThe patch file has been generated to './output_dir/patch.sql'\n"+
"\x1b[1A\x1b[J\nSummary:\n\nThe data of `1` does not exist in upstream database.\nThe structure of `2` is not equal.\nThe data of `4` is not equal.\n"+
"\nThe rest of the tables are all equal.\nA total of 4 tables have been compared, 1 tables finished, 2 tables failed, 1 tables skipped.\nThe patch file has been generated to './output_dir/patch.sql'\n"+
"You can view the comparison details through './output_dir/sync_diff_inspector.log'\n\n",
)
}

func TestTableError(t *testing.T) {
p := NewTableProgressPrinter(4, 0)
p.RegisterTable("1", true, true)
p.RegisterTable("1", true, true, 0)
p.StartTable("1", 50, true)
p.RegisterTable("2", true, true)
p.RegisterTable("2", true, true, 0)
p.StartTable("2", 1, true)
p.RegisterTable("3", true, true, -1)
p.StartTable("3", 1, true)

p.Inc("2")
buffer := new(bytes.Buffer)
p.SetOutput(buffer)
Expand All @@ -73,16 +76,19 @@ func TestTableError(t *testing.T) {
"\x1b[2A\x1b[JComparing the table structure of `2` ... failure\n"+
"_____________________________________________________________________________\n"+
"Progress [==============================>------------------------------] 50% 0/0\n"+
"\x1b[2A\x1b[JComparing the table data of `3` ...skipped\n"+
"_____________________________________________________________________________\n"+
"Progress [=============================================>---------------] 75% 0/1\n"+
"\x1b[1A\x1b[J\nError in comparison process:\n[aaa]\n\n"+
"You can view the comparison details through './output_dir/sync_diff_inspector.log'\n",
)
}

func TestAllSuccess(t *testing.T) {
Init(2, 0)
RegisterTable("1", false, false)
RegisterTable("1", false, false, 0)
StartTable("1", 1, true)
RegisterTable("2", false, false)
RegisterTable("2", false, false, 0)
StartTable("2", 1, true)
Inc("1")
Inc("2")
Expand Down
Loading