Skip to content

Commit

Permalink
puller(ticdc): fix renameTables ddl job (#9471) (#9495)
Browse files Browse the repository at this point in the history
close #9476
  • Loading branch information
ti-chi-bot authored Aug 4, 2023
1 parent 22d39ad commit 51fb570
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 25 deletions.
52 changes: 28 additions & 24 deletions cdc/puller/ddl_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,37 +270,41 @@ func (p *ddlJobPullerImpl) handleRenameTables(job *timodel.Job) (skip bool, err
remainTables := make([]*timodel.TableInfo, 0, len(multiTableInfos))
snap := p.schemaStorage.GetLastSnapshot()
for i, tableInfo := range multiTableInfos {
schema, ok := snap.SchemaByID(newSchemaIDs[i])
var shouldDiscardOldTable, shouldDiscardNewTable bool
oldTable, ok := snap.PhysicalTableByID(tableInfo.ID)
if !ok {
return true, cerror.ErrSnapshotSchemaNotFound.GenWithStackByArgs(newSchemaIDs[i])
shouldDiscardOldTable = true
} else {
shouldDiscardOldTable = p.filter.ShouldDiscardDDL(job.Type, oldSchemaNames[i].O, oldTable.Name.O)
}
table, ok := snap.PhysicalTableByID(tableInfo.ID)

newSchemaName, ok := snap.SchemaByID(newSchemaIDs[i])
if !ok {
// if a table is not found and its new name is in filter rule, return error.
if !p.filter.ShouldDiscardDDL(job.Type, schema.Name.O, newTableNames[i].O) {
return true, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(tableInfo.ID, job.Query)
}
continue
// the new table name does not hit the filter rule, so we should discard the table.
shouldDiscardNewTable = true
} else {
shouldDiscardNewTable = p.filter.ShouldDiscardDDL(job.Type, newSchemaName.Name.O, newTableNames[i].O)
}
// we skip a rename table ddl only when its old table name and new table name are both filtered.
skip := p.filter.ShouldDiscardDDL(job.Type, oldSchemaNames[i].O, table.Name.O)
if skip {
// if a table should be skipped by its old name and its new name is in filter rule, return error.
if !p.filter.ShouldDiscardDDL(job.Type, schema.Name.O, newTableNames[i].O) {
return true, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(tableInfo.ID, job.Query)
}
log.Info("table is filtered",

if shouldDiscardOldTable && shouldDiscardNewTable {
// skip a rename table ddl only when its old table name and new table name are both filtered.
log.Info("RenameTables is filtered",
zap.Int64("tableID", tableInfo.ID),
zap.String("schema", oldSchemaNames[i].O),
zap.String("table", table.Name.O))
} else {
remainTables = append(remainTables, tableInfo)
remainOldSchemaIDs = append(remainOldSchemaIDs, oldSchemaIDs[i])
remainNewSchemaIDs = append(remainNewSchemaIDs, newSchemaIDs[i])
remainOldTableIDs = append(remainOldTableIDs, oldTableIDs[i])
remainNewTableNames = append(remainNewTableNames, newTableNames[i])
remainOldSchemaNames = append(remainOldSchemaNames, oldSchemaNames[i])
zap.String("query", job.Query))
continue
}
if shouldDiscardOldTable && !shouldDiscardNewTable {
// if old table is not in filter rule and its new name is in filter rule, return error.
return true, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(tableInfo.ID, job.Query)
}
// old table name matches the filter rule, remain it.
remainTables = append(remainTables, tableInfo)
remainOldSchemaIDs = append(remainOldSchemaIDs, oldSchemaIDs[i])
remainNewSchemaIDs = append(remainNewSchemaIDs, newSchemaIDs[i])
remainOldTableIDs = append(remainOldTableIDs, oldTableIDs[i])
remainNewTableNames = append(remainNewTableNames, newTableNames[i])
remainOldSchemaNames = append(remainOldSchemaNames, oldSchemaNames[i])
}

if len(remainTables) == 0 {
Expand Down
12 changes: 11 additions & 1 deletion cdc/puller/ddl_puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,17 @@ func TestHandleRenameTable(t *testing.T) {
mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1)
waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1)

job = helper.DDL2Job("rename table test1.t1 to test1.t11, test1.t3 to test1.t33, test1.t5 to test1.t55")
job = helper.DDL2Job("create database ignore1")
mockPuller.appendDDL(job)
mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1)
waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1)

job = helper.DDL2Job("create table ignore1.a(id int)")
mockPuller.appendDDL(job)
mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1)
waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1)

job = helper.DDL2Job("rename table test1.t1 to test1.t11, test1.t3 to test1.t33, test1.t5 to test1.t55, ignore1.a to ignore1.b")

skip, err := ddlJobPullerImpl.handleRenameTables(job)
require.NoError(t, err)
Expand Down

0 comments on commit 51fb570

Please sign in to comment.