Skip to content

Commit b4be17a

Browse files
authored
sync-diff-inspector: skip validation for tables that exist only upstream or downstream and print skipped information in summary and progress (#693)
ref #692
1 parent 5c7c9ee commit b4be17a

File tree

17 files changed

+485
-113
lines changed

17 files changed

+485
-113
lines changed

sync_diff_inspector/config/config.go

+3
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,8 @@ type Config struct {
374374
CheckStructOnly bool `toml:"check-struct-only" json:"check-struct-only"`
375375
// experimental feature: only check table data without table struct
376376
CheckDataOnly bool `toml:"check-data-only" json:"-"`
377+
// skip validation for tables that don't exist upstream or downstream
378+
SkipNonExistingTable bool `toml:"skip-non-existing-table" json:"-"`
377379
// DMAddr is dm-master's address, the format should like "http://127.0.0.1:8261"
378380
DMAddr string `toml:"dm-addr" json:"dm-addr"`
379381
// DMTask string `toml:"dm-task" json:"dm-task"`
@@ -411,6 +413,7 @@ func NewConfig() *Config {
411413
fs.IntVar(&cfg.CheckThreadCount, "check-thread-count", 4, "how many goroutines are created to check data")
412414
fs.BoolVar(&cfg.ExportFixSQL, "export-fix-sql", true, "set true if want to compare rows or set to false will only compare checksum")
413415
fs.BoolVar(&cfg.CheckStructOnly, "check-struct-only", false, "ignore check table's data")
416+
fs.BoolVar(&cfg.SkipNonExistingTable, "skip-non-existing-table", false, "skip validation for tables that don't exist upstream or downstream")
414417
fs.BoolVar(&cfg.CheckDataOnly, "check-data-only", false, "ignore check table's struct")
415418

416419
_ = fs.MarkHidden("check-data-only")

sync_diff_inspector/diff.go

+20-8
Original file line numberDiff line numberDiff line change
@@ -297,12 +297,16 @@ func (df *Diff) StructEqual(ctx context.Context) error {
297297
tableIndex = df.startRange.ChunkRange.Index.TableIndex
298298
}
299299
for ; tableIndex < len(tables); tableIndex++ {
300-
isEqual, isSkip, err := df.compareStruct(ctx, tableIndex)
301-
if err != nil {
302-
return errors.Trace(err)
300+
isEqual, isSkip, isAllTableExist := false, true, tables[tableIndex].TableLack
301+
if common.AllTableExist(isAllTableExist) {
302+
var err error
303+
isEqual, isSkip, err = df.compareStruct(ctx, tableIndex)
304+
if err != nil {
305+
return errors.Trace(err)
306+
}
303307
}
304-
progress.RegisterTable(dbutil.TableName(tables[tableIndex].Schema, tables[tableIndex].Table), !isEqual, isSkip)
305-
df.report.SetTableStructCheckResult(tables[tableIndex].Schema, tables[tableIndex].Table, isEqual, isSkip)
308+
progress.RegisterTable(dbutil.TableName(tables[tableIndex].Schema, tables[tableIndex].Table), !isEqual, isSkip, isAllTableExist)
309+
df.report.SetTableStructCheckResult(tables[tableIndex].Schema, tables[tableIndex].Table, isEqual, isSkip, isAllTableExist)
306310
}
307311
return nil
308312
}
@@ -411,12 +415,21 @@ func (df *Diff) consume(ctx context.Context, rangeInfo *splitter.RangeInfo) bool
411415
node: rangeInfo.ToNode(),
412416
}
413417
defer func() { df.sqlCh <- dml }()
418+
tableDiff := df.downstream.GetTables()[rangeInfo.GetTableIndex()]
419+
schema, table := tableDiff.Schema, tableDiff.Table
420+
id := rangeInfo.ChunkRange.Index
414421
if rangeInfo.ChunkRange.Type == chunk.Empty {
415422
dml.node.State = checkpoints.IgnoreState
423+
// for tables that don't exist upstream or downstream
424+
if !common.AllTableExist(tableDiff.TableLack) {
425+
upCount := df.upstream.GetCountForLackTable(ctx, rangeInfo)
426+
downCount := df.downstream.GetCountForLackTable(ctx, rangeInfo)
427+
df.report.SetTableDataCheckResult(schema, table, false, int(upCount), int(downCount), upCount, downCount, id)
428+
return false
429+
}
416430
return true
417431
}
418-
tableDiff := df.downstream.GetTables()[rangeInfo.GetTableIndex()]
419-
schema, table := tableDiff.Schema, tableDiff.Table
432+
420433
var state string = checkpoints.SuccessState
421434

422435
isEqual, upCount, downCount, err := df.compareChecksumAndGetCount(ctx, rangeInfo)
@@ -447,7 +460,6 @@ func (df *Diff) consume(ctx context.Context, rangeInfo *splitter.RangeInfo) bool
447460
isEqual = isDataEqual
448461
}
449462
dml.node.State = state
450-
id := rangeInfo.ChunkRange.Index
451463
df.report.SetTableDataCheckResult(schema, table, isEqual, dml.rowAdd, dml.rowDelete, upCount, downCount, id)
452464
return isEqual
453465
}

sync_diff_inspector/progress/progress.go

+34-7
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import (
2020
"os"
2121
"strings"
2222
"time"
23+
24+
"github.com/pingcap/tidb-tools/sync_diff_inspector/source/common"
2325
)
2426

2527
type TableProgressPrinter struct {
@@ -53,7 +55,9 @@ const (
5355
TABLE_STATE_RESULT_FAIL_STRUCTURE_PASS table_state_t = 0x40
5456
TABLE_STATE_RESULT_DIFFERENT table_state_t = 0x80
5557
TABLE_STATE_HEAD table_state_t = 0xff
56-
TABLE_STATE_RESULT_MASK table_state_t = 0xf0
58+
TABLE_STATE_RESULT_MASK table_state_t = 0xff0
59+
TABLE_STATE_NOT_EXSIT_UPSTREAM table_state_t = 0x100
60+
TABLE_STATE_NOT_EXSIT_DOWNSTREAM table_state_t = 0x200
5761
)
5862

5963
type TableProgress struct {
@@ -127,11 +131,18 @@ func (tpp *TableProgressPrinter) UpdateTotal(name string, total int, stopUpdate
127131
}
128132
}
129133

130-
func (tpp *TableProgressPrinter) RegisterTable(name string, isFailed bool, isDone bool) {
134+
func (tpp *TableProgressPrinter) RegisterTable(name string, isFailed bool, isDone bool, isExist int) {
131135
var state table_state_t
132136
if isFailed {
133137
if isDone {
134-
state = TABLE_STATE_RESULT_FAIL_STRUCTURE_DONE | TABLE_STATE_REGISTER
138+
switch isExist {
139+
case common.UpstreamTableLackFlag:
140+
state = TABLE_STATE_NOT_EXSIT_UPSTREAM | TABLE_STATE_REGISTER
141+
case common.DownstreamTableLackFlag:
142+
state = TABLE_STATE_NOT_EXSIT_DOWNSTREAM | TABLE_STATE_REGISTER
143+
default:
144+
state = TABLE_STATE_RESULT_FAIL_STRUCTURE_DONE | TABLE_STATE_REGISTER
145+
}
135146
} else {
136147
state = TABLE_STATE_RESULT_FAIL_STRUCTURE_CONTINUE | TABLE_STATE_REGISTER
137148
}
@@ -181,6 +192,7 @@ func (tpp *TableProgressPrinter) PrintSummary() {
181192
tpp.tableNums,
182193
)
183194
} else {
195+
SkippedNum := 0
184196
for p := tpp.tableFailList.Front(); p != nil; p = p.Next() {
185197
tp := p.Value.(*TableProgress)
186198
if tp.state&(TABLE_STATE_RESULT_FAIL_STRUCTURE_DONE|TABLE_STATE_RESULT_FAIL_STRUCTURE_CONTINUE) != 0 {
@@ -189,10 +201,18 @@ func (tpp *TableProgressPrinter) PrintSummary() {
189201
if tp.state&(TABLE_STATE_RESULT_DIFFERENT) != 0 {
190202
fixStr = fmt.Sprintf("%sThe data of `%s` is not equal.\n", fixStr, tp.name)
191203
}
204+
if tp.state&(TABLE_STATE_NOT_EXSIT_DOWNSTREAM) != 0 {
205+
fixStr = fmt.Sprintf("%sThe data of `%s` does not exist in downstream database.\n", fixStr, tp.name)
206+
SkippedNum++
207+
}
208+
if tp.state&(TABLE_STATE_NOT_EXSIT_UPSTREAM) != 0 {
209+
fixStr = fmt.Sprintf("%sThe data of `%s` does not exist in upstream database.\n", fixStr, tp.name)
210+
SkippedNum++
211+
}
192212
}
193213
fixStr = fmt.Sprintf(
194-
"%s\nThe rest of the tables are all equal.\nThe patch file has been generated to './output_dir/patch.sql'\nYou can view the comparison details through './output_dir/sync_diff_inspector.log'\n",
195-
fixStr,
214+
"%s\nThe rest of the tables are all equal.\nA total of %d tables have been compared, %d tables finished, %d tables failed, %d tables skipped.\nThe patch file has been generated to './output_dir/patch.sql'\nYou can view the comparison details through './output_dir/sync_diff_inspector.log'\n",
215+
fixStr, tpp.tableNums, tpp.tableNums-tpp.tableFailList.Len(), tpp.tableFailList.Len()-SkippedNum, SkippedNum,
196216
)
197217
}
198218

@@ -337,6 +357,13 @@ func (tpp *TableProgressPrinter) flush(stateIsChanged bool) {
337357
tpp.lines++
338358
tpp.progressTableNums++
339359
tp.state = TABLE_STATE_COMPARING
360+
case TABLE_STATE_NOT_EXSIT_UPSTREAM, TABLE_STATE_NOT_EXSIT_DOWNSTREAM:
361+
dynStr = fmt.Sprintf("%sComparing the table data of `%s` ...skipped\n", dynStr, tp.name)
362+
tpp.tableFailList.PushBack(tp)
363+
preNode := p.Prev()
364+
tpp.tableList.Remove(p)
365+
p = preNode
366+
tpp.finishTableNums++
340367
case TABLE_STATE_RESULT_FAIL_STRUCTURE_DONE:
341368
fixStr = fmt.Sprintf("%sComparing the table structure of `%s` ... failure\n", fixStr, tp.name)
342369
tpp.tableFailList.PushBack(tp)
@@ -410,9 +437,9 @@ func UpdateTotal(name string, total int, stopUpdate bool) {
410437
}
411438
}
412439

413-
func RegisterTable(name string, isFailed bool, isDone bool) {
440+
func RegisterTable(name string, isFailed bool, isDone bool, isExist int) {
414441
if progress_ != nil {
415-
progress_.RegisterTable(name, isFailed, isDone)
442+
progress_.RegisterTable(name, isFailed, isDone, isExist)
416443
}
417444
}
418445

sync_diff_inspector/progress/progress_test.go

+21-10
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,18 @@ import (
1919
"testing"
2020
"time"
2121

22+
"github.com/pingcap/tidb-tools/sync_diff_inspector/source/common"
2223
"github.com/stretchr/testify/require"
2324
)
2425

2526
func TestProgress(t *testing.T) {
26-
p := NewTableProgressPrinter(4, 0)
27-
p.RegisterTable("1", true, true)
27+
p := NewTableProgressPrinter(6, 0)
28+
p.RegisterTable("1", true, true, common.AllTableExistFlag)
2829
p.StartTable("1", 50, true)
29-
p.RegisterTable("2", true, false)
30+
p.RegisterTable("2", true, false, common.AllTableExistFlag)
3031
p.StartTable("2", 2, true)
3132
p.Inc("2")
32-
p.RegisterTable("3", false, false)
33+
p.RegisterTable("3", false, false, common.AllTableExistFlag)
3334
p.StartTable("3", 1, false)
3435
p.Inc("2")
3536
p.Inc("3")
@@ -39,6 +40,10 @@ func TestProgress(t *testing.T) {
3940
p.FailTable("4")
4041
p.Inc("3")
4142
p.Inc("4")
43+
p.RegisterTable("5", true, true, common.UpstreamTableLackFlag)
44+
p.StartTable("5", 1, true)
45+
p.RegisterTable("6", true, true, common.DownstreamTableLackFlag)
46+
p.StartTable("6", 1, true)
4247
time.Sleep(500 * time.Millisecond)
4348
p.Close()
4449
buffer := new(bytes.Buffer)
@@ -47,18 +52,21 @@ func TestProgress(t *testing.T) {
4752
require.Equal(
4853
t,
4954
buffer.String(),
50-
"\x1b[1A\x1b[J\nSummary:\n\nThe structure of `1` is not equal.\nThe structure of `2` is not equal.\nThe data of `4` is not equal.\n"+
51-
"\nThe rest of the tables are all equal.\nThe patch file has been generated to './output_dir/patch.sql'\n"+
55+
"\x1b[1A\x1b[J\nSummary:\n\nThe structure of `1` is not equal.\nThe structure of `2` is not equal.\nThe data of `4` is not equal.\nThe data of `5` does not exist in upstream database.\nThe data of `6` does not exist in downstream database.\n"+
56+
"\nThe rest of the tables are all equal.\nA total of 6 tables have been compared, 1 tables finished, 3 tables failed, 2 tables skipped.\nThe patch file has been generated to './output_dir/patch.sql'\n"+
5257
"You can view the comparison details through './output_dir/sync_diff_inspector.log'\n\n",
5358
)
5459
}
5560

5661
func TestTableError(t *testing.T) {
5762
p := NewTableProgressPrinter(4, 0)
58-
p.RegisterTable("1", true, true)
63+
p.RegisterTable("1", true, true, common.AllTableExistFlag)
5964
p.StartTable("1", 50, true)
60-
p.RegisterTable("2", true, true)
65+
p.RegisterTable("2", true, true, common.AllTableExistFlag)
6166
p.StartTable("2", 1, true)
67+
p.RegisterTable("3", true, true, common.DownstreamTableLackFlag)
68+
p.StartTable("3", 1, true)
69+
6270
p.Inc("2")
6371
buffer := new(bytes.Buffer)
6472
p.SetOutput(buffer)
@@ -73,16 +81,19 @@ func TestTableError(t *testing.T) {
7381
"\x1b[2A\x1b[JComparing the table structure of `2` ... failure\n"+
7482
"_____________________________________________________________________________\n"+
7583
"Progress [==============================>------------------------------] 50% 0/0\n"+
84+
"\x1b[2A\x1b[JComparing the table data of `3` ...skipped\n"+
85+
"_____________________________________________________________________________\n"+
86+
"Progress [=============================================>---------------] 75% 0/1\n"+
7687
"\x1b[1A\x1b[J\nError in comparison process:\n[aaa]\n\n"+
7788
"You can view the comparison details through './output_dir/sync_diff_inspector.log'\n",
7889
)
7990
}
8091

8192
func TestAllSuccess(t *testing.T) {
8293
Init(2, 0)
83-
RegisterTable("1", false, false)
94+
RegisterTable("1", false, false, common.AllTableExistFlag)
8495
StartTable("1", 1, true)
85-
RegisterTable("2", false, false)
96+
RegisterTable("2", false, false, common.AllTableExistFlag)
8697
StartTable("2", 1, true)
8798
Inc("1")
8899
Inc("2")

sync_diff_inspector/report/report.go

+32-14
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ type TableResult struct {
6565
ChunkMap map[string]*ChunkResult `json:"chunk-result"` // `ChunkMap` stores the `ChunkResult` of each chunk of the table
6666
UpCount int64 `json:"up-count"` // `UpCount` is the number of rows in the table from upstream
6767
DownCount int64 `json:"down-count"` // `DownCount` is the number of rows in the table from downstream
68-
68+
TableLack int `json:"table-lack"`
6969
}
7070

7171
// ChunkResult save the necessarily information to provide summary information
@@ -80,6 +80,7 @@ type Report struct {
8080
Result string `json:"-"` // Result is pass or fail
8181
PassNum int32 `json:"-"` // The pass number of tables
8282
FailedNum int32 `json:"-"` // The failed number of tables
83+
SkippedNum int32 `json:"-"` // The skipped number of tables
8384
TableResults map[string]map[string]*TableResult `json:"table-results"` // TableResult saved the map of `schema` => `table` => `tableResult`
8485
StartTime time.Time `json:"start-time"`
8586
Duration time.Duration `json:"time-duration"`
@@ -131,6 +132,11 @@ func (r *Report) getDiffRows() [][]string {
131132
}
132133
diffRow := make([]string, 0)
133134
diffRow = append(diffRow, dbutil.TableName(schema, table))
135+
if !common.AllTableExist(result.TableLack) {
136+
diffRow = append(diffRow, "skipped")
137+
} else {
138+
diffRow = append(diffRow, "succeed")
139+
}
134140
if !result.StructEqual {
135141
diffRow = append(diffRow, "false")
136142
} else {
@@ -154,7 +160,6 @@ func (r *Report) CalculateTotalSize(ctx context.Context, db *sql.DB) {
154160
for schema, tableMap := range r.TableResults {
155161
for table := range tableMap {
156162
size, err := utils.GetTableSize(ctx, db, schema, table)
157-
158163
if size == 0 || err != nil {
159164
log.Warn("fail to get the correct size of table, if you want to get the correct size, please analyze the corresponding tables", zap.String("table", dbutil.TableName(schema, table)), zap.Error(err))
160165
} else {
@@ -166,18 +171,21 @@ func (r *Report) CalculateTotalSize(ctx context.Context, db *sql.DB) {
166171

167172
// CommitSummary commit summary info
168173
func (r *Report) CommitSummary() error {
169-
passNum, failedNum := int32(0), int32(0)
174+
passNum, failedNum, skippedNum := int32(0), int32(0), int32(0)
170175
for _, tableMap := range r.TableResults {
171176
for _, result := range tableMap {
172177
if result.StructEqual && result.DataEqual {
173178
passNum++
179+
} else if !common.AllTableExist(result.TableLack) {
180+
skippedNum++
174181
} else {
175182
failedNum++
176183
}
177184
}
178185
}
179186
r.PassNum = passNum
180187
r.FailedNum = failedNum
188+
r.SkippedNum = skippedNum
181189
summaryPath := filepath.Join(r.task.OutputDir, "summary.txt")
182190
summaryFile, err := os.Create(summaryPath)
183191
if err != nil {
@@ -208,11 +216,11 @@ func (r *Report) CommitSummary() error {
208216
summaryFile.WriteString(tableString.String())
209217
summaryFile.WriteString("\n\n")
210218
}
211-
if r.Result == Fail {
219+
if r.Result == Fail || r.SkippedNum != 0 {
212220
summaryFile.WriteString("The following tables contains inconsistent data\n\n")
213221
tableString := &strings.Builder{}
214222
table := tablewriter.NewWriter(tableString)
215-
table.SetHeader([]string{"Table", "Structure equality", "Data diff rows", "UpCount", "DownCount"})
223+
table.SetHeader([]string{"Table", "Result", "Structure equality", "Data diff rows", "UpCount", "DownCount"})
216224
diffRows := r.getDiffRows()
217225
for _, v := range diffRows {
218226
table.Append(v)
@@ -228,26 +236,35 @@ func (r *Report) CommitSummary() error {
228236

229237
func (r *Report) Print(w io.Writer) error {
230238
var summary strings.Builder
231-
if r.Result == Pass {
232-
summary.WriteString(fmt.Sprintf("A total of %d table have been compared and all are equal.\n", r.FailedNum+r.PassNum))
239+
if r.Result == Pass && r.SkippedNum == 0 {
240+
summary.WriteString(fmt.Sprintf("A total of %d table have been compared and all are equal.\n", r.FailedNum+r.PassNum+r.SkippedNum))
233241
summary.WriteString(fmt.Sprintf("You can view the comparision details through '%s/%s'\n", r.task.OutputDir, config.LogFileName))
234-
} else if r.Result == Fail {
242+
} else if r.Result == Fail || r.SkippedNum != 0 {
235243
for schema, tableMap := range r.TableResults {
236244
for table, result := range tableMap {
237245
if !result.StructEqual {
238246
if result.DataSkip {
239-
summary.WriteString(fmt.Sprintf("The structure of %s is not equal, and data-check is skipped\n", dbutil.TableName(schema, table)))
247+
switch result.TableLack {
248+
case common.UpstreamTableLackFlag:
249+
summary.WriteString(fmt.Sprintf("The data of %s does not exist in upstream database\n", dbutil.TableName(schema, table)))
250+
case common.DownstreamTableLackFlag:
251+
summary.WriteString(fmt.Sprintf("The data of %s does not exist in downstream database\n", dbutil.TableName(schema, table)))
252+
default:
253+
summary.WriteString(fmt.Sprintf("The structure of %s is not equal, and data-check is skipped\n", dbutil.TableName(schema, table)))
254+
}
240255
} else {
241256
summary.WriteString(fmt.Sprintf("The structure of %s is not equal\n", dbutil.TableName(schema, table)))
242257
}
243258
}
244-
if !result.DataEqual {
259+
if !result.DataEqual && common.AllTableExist(result.TableLack) {
245260
summary.WriteString(fmt.Sprintf("The data of %s is not equal\n", dbutil.TableName(schema, table)))
246261
}
247262
}
248263
}
249264
summary.WriteString("\n")
250265
summary.WriteString("The rest of tables are all equal.\n")
266+
summary.WriteString("\n")
267+
summary.WriteString(fmt.Sprintf("A total of %d tables have been compared, %d tables finished, %d tables failed, %d tables skipped.\n", r.FailedNum+r.PassNum+r.SkippedNum, r.PassNum, r.FailedNum, r.SkippedNum))
251268
summary.WriteString(fmt.Sprintf("The patch file has been generated in \n\t'%s/'\n", r.task.FixDir))
252269
summary.WriteString(fmt.Sprintf("You can view the comparision details through '%s/%s'\n", r.task.OutputDir, config.LogFileName))
253270
} else {
@@ -295,13 +312,14 @@ func (r *Report) Init(tableDiffs []*common.TableDiff, sourceConfig [][]byte, tar
295312
}
296313

297314
// SetTableStructCheckResult sets the struct check result for table.
298-
func (r *Report) SetTableStructCheckResult(schema, table string, equal bool, skip bool) {
315+
func (r *Report) SetTableStructCheckResult(schema, table string, equal bool, skip bool, exist int) {
299316
r.Lock()
300317
defer r.Unlock()
301318
tableResult := r.TableResults[schema][table]
302319
tableResult.StructEqual = equal
303320
tableResult.DataSkip = skip
304-
if !equal && r.Result != Error {
321+
tableResult.TableLack = exist
322+
if !equal && common.AllTableExist(tableResult.TableLack) && r.Result != Error {
305323
r.Result = Fail
306324
}
307325
}
@@ -323,11 +341,11 @@ func (r *Report) SetTableDataCheckResult(schema, table string, equal bool, rowsA
323341
}
324342
result.ChunkMap[id.ToString()].RowsAdd += rowsAdd
325343
result.ChunkMap[id.ToString()].RowsDelete += rowsDelete
326-
if r.Result != Error {
344+
if r.Result != Error && common.AllTableExist(result.TableLack) {
327345
r.Result = Fail
328346
}
329347
}
330-
if !equal && r.Result != Error {
348+
if !equal && common.AllTableExist(result.TableLack) && r.Result != Error {
331349
r.Result = Fail
332350
}
333351
}

0 commit comments

Comments
 (0)