diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index d3a3d2d81f58a..f6439c198ab9c 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -1001,6 +1001,9 @@ func (bc *Client) findRegionLeader(ctx context.Context, key []byte, isRawKv bool // in order to find the correct region. key = codec.EncodeBytesExt([]byte{}, key, isRawKv) for i := 1; i < 100; i++ { + if err := ctx.Err(); err != nil { + return nil, err + } // better backoff. region, err := bc.mgr.GetPDClient().GetRegion(ctx, key) if err != nil || region == nil { diff --git a/br/pkg/lightning/backend/tidb/tidb_test.go b/br/pkg/lightning/backend/tidb/tidb_test.go index d3ce4b3809bea..c2ba7f9f29e05 100644 --- a/br/pkg/lightning/backend/tidb/tidb_test.go +++ b/br/pkg/lightning/backend/tidb/tidb_test.go @@ -491,35 +491,35 @@ func TestWriteRowsErrorDowngradingAll(t *testing.T) { ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(1)\\E"). WillReturnError(nonRetryableError) s.mockDB. - ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*"). + ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v2.*"). WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "7.csv", int64(0), nonRetryableError.Error(), "(1)"). WillReturnResult(driver.ResultNoRows) s.mockDB. ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(2)\\E"). WillReturnError(nonRetryableError) s.mockDB. - ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*"). + ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v2.*"). WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "8.csv", int64(0), nonRetryableError.Error(), "(2)"). WillReturnResult(driver.ResultNoRows) s.mockDB. ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(3)\\E"). WillReturnError(nonRetryableError) s.mockDB. - ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*"). + ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v2.*"). WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "9.csv", int64(0), nonRetryableError.Error(), "(3)"). WillReturnResult(driver.ResultNoRows) s.mockDB. ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(4)\\E"). WillReturnError(nonRetryableError) s.mockDB. - ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*"). + ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v2.*"). WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "10.csv", int64(0), nonRetryableError.Error(), "(4)"). WillReturnResult(driver.ResultNoRows) s.mockDB. ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(5)\\E"). WillReturnError(nonRetryableError) s.mockDB. - ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*"). + ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v2.*"). WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "11.csv", int64(0), nonRetryableError.Error(), "(5)"). WillReturnResult(driver.ResultNoRows) @@ -557,21 +557,21 @@ func TestWriteRowsErrorDowngradingExceedThreshold(t *testing.T) { ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(1)\\E"). WillReturnError(nonRetryableError) s.mockDB. - ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*"). + ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v2.*"). WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "7.csv", int64(0), nonRetryableError.Error(), "(1)"). WillReturnResult(driver.ResultNoRows) s.mockDB. ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(2)\\E"). WillReturnError(nonRetryableError) s.mockDB. - ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*"). + ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v2.*"). WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "8.csv", int64(0), nonRetryableError.Error(), "(2)"). WillReturnResult(driver.ResultNoRows) s.mockDB. ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(3)\\E"). WillReturnError(nonRetryableError) s.mockDB. - ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*"). + ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v2.*"). WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "9.csv", int64(0), nonRetryableError.Error(), "(3)"). WillReturnResult(driver.ResultNoRows) // the forth row will exceed the error threshold, won't record this error diff --git a/br/pkg/lightning/config/config.go b/br/pkg/lightning/config/config.go index 87256290c9072..1717cfdf17022 100644 --- a/br/pkg/lightning/config/config.go +++ b/br/pkg/lightning/config/config.go @@ -375,11 +375,11 @@ const ( // DupeResAlgNone doesn't detect duplicate. DupeResAlgNone DuplicateResolutionAlgorithm = iota - // DupeResAlgRecord only records duplicate records to `lightning_task_info.conflict_error_v1` table on the target TiDB. + // DupeResAlgRecord only records duplicate records to `lightning_task_info.conflict_error_v1_2` table on the target TiDB. DupeResAlgRecord // DupeResAlgRemove records all duplicate records like the 'record' algorithm and remove all information related to the - // duplicated rows. Users need to analyze the lightning_task_info.conflict_error_v1 table to add back the correct rows. + // duplicated rows. Users need to analyze the lightning_task_info.conflict_error_v1_2 table to add back the correct rows. DupeResAlgRemove // DupeResAlgErr reports an error and stops the import process. diff --git a/br/pkg/lightning/errormanager/errormanager.go b/br/pkg/lightning/errormanager/errormanager.go index 373ba572779d4..af9f22dbe31e7 100644 --- a/br/pkg/lightning/errormanager/errormanager.go +++ b/br/pkg/lightning/errormanager/errormanager.go @@ -40,13 +40,14 @@ const ( CREATE SCHEMA IF NOT EXISTS %s; ` - syntaxErrorTableName = "syntax_error_v1" - typeErrorTableName = "type_error_v1" + syntaxErrorTableName = "syntax_error_v2" + typeErrorTableName = "type_error_v2" // ConflictErrorTableName is the table name for duplicate detection. - ConflictErrorTableName = "conflict_error_v1" + ConflictErrorTableName = "conflict_error_v1_2" createSyntaxErrorTable = ` CREATE TABLE IF NOT EXISTS %s.` + syntaxErrorTableName + ` ( + id bigint PRIMARY KEY AUTO_INCREMENT, task_id bigint NOT NULL, create_time datetime(6) NOT NULL DEFAULT now(6), table_name varchar(261) NOT NULL, @@ -59,6 +60,7 @@ const ( createTypeErrorTable = ` CREATE TABLE IF NOT EXISTS %s.` + typeErrorTableName + ` ( + id bigint PRIMARY KEY AUTO_INCREMENT, task_id bigint NOT NULL, create_time datetime(6) NOT NULL DEFAULT now(6), table_name varchar(261) NOT NULL, @@ -71,6 +73,7 @@ const ( createConflictErrorTable = ` CREATE TABLE IF NOT EXISTS %s.` + ConflictErrorTableName + ` ( + id bigint PRIMARY KEY AUTO_INCREMENT, task_id bigint NOT NULL, create_time datetime(6) NOT NULL DEFAULT now(6), table_name varchar(261) NOT NULL, @@ -108,10 +111,10 @@ const ( sqlValuesConflictErrorIndex = "(?,?,?,?,?,?,?,?,?)" selectConflictKeys = ` - SELECT _tidb_rowid, raw_handle, raw_row + SELECT id, raw_handle, raw_row FROM %s.` + ConflictErrorTableName + ` - WHERE table_name = ? AND _tidb_rowid >= ? and _tidb_rowid < ? - ORDER BY _tidb_rowid LIMIT ?; + WHERE table_name = ? AND id >= ? and id < ? + ORDER BY id LIMIT ?; ` ) diff --git a/br/pkg/lightning/errormanager/errormanager_test.go b/br/pkg/lightning/errormanager/errormanager_test.go index 88808e35628b8..7902a16b4abf0 100644 --- a/br/pkg/lightning/errormanager/errormanager_test.go +++ b/br/pkg/lightning/errormanager/errormanager_test.go @@ -55,7 +55,7 @@ func TestInit(t *testing.T) { em.dupResolution = config.DupeResAlgRecord mock.ExpectExec("CREATE SCHEMA IF NOT EXISTS `lightning_errors`;"). WillReturnResult(sqlmock.NewResult(1, 1)) - mock.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_errors`\\.conflict_error_v1.*"). + mock.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_errors`\\.conflict_error_v1_2.*"). WillReturnResult(sqlmock.NewResult(2, 1)) err = em.Init(ctx) require.NoError(t, err) @@ -64,7 +64,7 @@ func TestInit(t *testing.T) { em.remainingError.Type.Store(1) mock.ExpectExec("CREATE SCHEMA IF NOT EXISTS `lightning_errors`;"). WillReturnResult(sqlmock.NewResult(3, 1)) - mock.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_errors`\\.type_error_v1.*"). + mock.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_errors`\\.type_error_v2.*"). WillReturnResult(sqlmock.NewResult(4, 1)) err = em.Init(ctx) require.NoError(t, err) @@ -72,9 +72,9 @@ func TestInit(t *testing.T) { em.remainingError.Type.Store(1) mock.ExpectExec("CREATE SCHEMA IF NOT EXISTS `lightning_errors`.*"). WillReturnResult(sqlmock.NewResult(5, 1)) - mock.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_errors`\\.type_error_v1.*"). + mock.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_errors`\\.type_error_v2.*"). WillReturnResult(sqlmock.NewResult(6, 1)) - mock.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_errors`\\.conflict_error_v1.*"). + mock.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_errors`\\.conflict_error_v1_2.*"). WillReturnResult(sqlmock.NewResult(7, 1)) err = em.Init(ctx) require.NoError(t, err) @@ -111,7 +111,7 @@ type mockRows struct { } func (r *mockRows) Columns() []string { - return []string{"_tidb_rowid", "raw_handle", "raw_row"} + return []string{"id", "raw_handle", "raw_row"} } func (r *mockRows) Close() error { return nil } @@ -120,7 +120,7 @@ func (r *mockRows) Next(dest []driver.Value) error { if r.start >= r.end { return io.EOF } - dest[0] = r.start // _tidb_rowid + dest[0] = r.start // id dest[1] = []byte{} // raw_handle dest[2] = []byte{} // raw_row r.start++ @@ -128,7 +128,7 @@ func (r *mockRows) Next(dest []driver.Value) error { } func (c mockConn) QueryContext(_ context.Context, query string, args []driver.NamedValue) (driver.Rows, error) { - expectedQuery := "SELECT _tidb_rowid, raw_handle, raw_row.*" + expectedQuery := "SELECT id, raw_handle, raw_row.*" if err := sqlmock.QueryMatcherRegexp.Match(expectedQuery, query); err != nil { return &mockRows{}, nil } @@ -241,7 +241,7 @@ func TestErrorMgrErrorOutput(t *testing.T) { em.remainingError.Syntax.Sub(1) output = em.Output() checkStr := strings.ReplaceAll(output, "\n", "") - expected := "Import Data Error Summary: +---+-------------+-------------+--------------------------------+| # | ERROR TYPE | ERROR COUNT | ERROR DATA TABLE |+---+-------------+-------------+--------------------------------+|\x1b[31m 1 \x1b[0m|\x1b[31m Data Syntax \x1b[0m|\x1b[31m 1 \x1b[0m|\x1b[31m `error_info`.`syntax_error_v1` \x1b[0m|+---+-------------+-------------+--------------------------------+" + expected := "Import Data Error Summary: +---+-------------+-------------+--------------------------------+| # | ERROR TYPE | ERROR COUNT | ERROR DATA TABLE |+---+-------------+-------------+--------------------------------+|\x1b[31m 1 \x1b[0m|\x1b[31m Data Syntax \x1b[0m|\x1b[31m 1 \x1b[0m|\x1b[31m `error_info`.`syntax_error_v2` \x1b[0m|+---+-------------+-------------+--------------------------------+" require.Equal(t, expected, checkStr) em.remainingError = cfg.App.MaxError @@ -249,7 +249,7 @@ func TestErrorMgrErrorOutput(t *testing.T) { em.remainingError.Type.Store(10) output = em.Output() checkStr = strings.ReplaceAll(output, "\n", "") - expected = "Import Data Error Summary: +---+-------------+-------------+--------------------------------+| # | ERROR TYPE | ERROR COUNT | ERROR DATA TABLE |+---+-------------+-------------+--------------------------------+|\x1b[31m 1 \x1b[0m|\x1b[31m Data Type \x1b[0m|\x1b[31m 90 \x1b[0m|\x1b[31m `error_info`.`type_error_v1` \x1b[0m||\x1b[31m 2 \x1b[0m|\x1b[31m Data Syntax \x1b[0m|\x1b[31m 10 \x1b[0m|\x1b[31m `error_info`.`syntax_error_v1` \x1b[0m|+---+-------------+-------------+--------------------------------+" + expected = "Import Data Error Summary: +---+-------------+-------------+--------------------------------+| # | ERROR TYPE | ERROR COUNT | ERROR DATA TABLE |+---+-------------+-------------+--------------------------------+|\x1b[31m 1 \x1b[0m|\x1b[31m Data Type \x1b[0m|\x1b[31m 90 \x1b[0m|\x1b[31m `error_info`.`type_error_v2` \x1b[0m||\x1b[31m 2 \x1b[0m|\x1b[31m Data Syntax \x1b[0m|\x1b[31m 10 \x1b[0m|\x1b[31m `error_info`.`syntax_error_v2` \x1b[0m|+---+-------------+-------------+--------------------------------+" require.Equal(t, expected, checkStr) // change multiple keys @@ -260,6 +260,6 @@ func TestErrorMgrErrorOutput(t *testing.T) { em.remainingError.Conflict.Store(0) output = em.Output() checkStr = strings.ReplaceAll(output, "\n", "") - expected = "Import Data Error Summary: +---+---------------------+-------------+----------------------------------+| # | ERROR TYPE | ERROR COUNT | ERROR DATA TABLE |+---+---------------------+-------------+----------------------------------+|\x1b[31m 1 \x1b[0m|\x1b[31m Data Type \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m `error_info`.`type_error_v1` \x1b[0m||\x1b[31m 2 \x1b[0m|\x1b[31m Data Syntax \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m `error_info`.`syntax_error_v1` \x1b[0m||\x1b[31m 3 \x1b[0m|\x1b[31m Charset Error \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m \x1b[0m||\x1b[31m 4 \x1b[0m|\x1b[31m Unique Key Conflict \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m `error_info`.`conflict_error_v1` \x1b[0m|+---+---------------------+-------------+----------------------------------+" + expected = "Import Data Error Summary: +---+---------------------+-------------+------------------------------------+| # | ERROR TYPE | ERROR COUNT | ERROR DATA TABLE |+---+---------------------+-------------+------------------------------------+|\x1b[31m 1 \x1b[0m|\x1b[31m Data Type \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m `error_info`.`type_error_v2` \x1b[0m||\x1b[31m 2 \x1b[0m|\x1b[31m Data Syntax \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m `error_info`.`syntax_error_v2` \x1b[0m||\x1b[31m 3 \x1b[0m|\x1b[31m Charset Error \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m \x1b[0m||\x1b[31m 4 \x1b[0m|\x1b[31m Unique Key Conflict \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m `error_info`.`conflict_error_v1_2` \x1b[0m|+---+---------------------+-------------+------------------------------------+" require.Equal(t, expected, checkStr) } diff --git a/br/tests/lightning_duplicate_detection/run.sh b/br/tests/lightning_duplicate_detection/run.sh index 6aace9b8ae5e4..9dd3ad5de056c 100644 --- a/br/tests/lightning_duplicate_detection/run.sh +++ b/br/tests/lightning_duplicate_detection/run.sh @@ -51,7 +51,7 @@ verify_detected_rows() { done done mapfile -t expect_rows < <(for row in "${expect_rows[@]}"; do echo "$row"; done | sort | uniq) - mapfile -t actual_rows < <(run_sql "SELECT row_data FROM lightning_task_info.conflict_error_v1 WHERE table_name = \"\`dup_detect\`.\`${table}\`\"" | + mapfile -t actual_rows < <(run_sql "SELECT row_data FROM lightning_task_info.conflict_error_v1_2 WHERE table_name = \"\`dup_detect\`.\`${table}\`\"" | grep "row_data:" | sed 's/^.*(//' | sed 's/).*$//' | sed 's/"//g' | sed 's/, */,/g' | sort | uniq) equal=0 if [ "${#actual_rows[@]}" = "${#expect_rows[@]}" ]; then diff --git a/br/tests/lightning_issue_40657/run.sh b/br/tests/lightning_issue_40657/run.sh index a20600b79d14b..6e29f3c024dd5 100644 --- a/br/tests/lightning_issue_40657/run.sh +++ b/br/tests/lightning_issue_40657/run.sh @@ -22,7 +22,7 @@ run_lightning -d "tests/$TEST_NAME/data1" run_sql 'admin check table test.t' run_sql 'select count(*) from test.t' check_contains 'count(*): 3' -run_sql 'select count(*) from lightning_task_info.conflict_error_v1' +run_sql 'select count(*) from lightning_task_info.conflict_error_v1_2' check_contains 'count(*): 2' run_sql 'truncate table test.t' diff --git a/br/tests/lightning_sqlmode/run.sh b/br/tests/lightning_sqlmode/run.sh index 81d44c2450d6d..a46681ae879e5 100755 --- a/br/tests/lightning_sqlmode/run.sh +++ b/br/tests/lightning_sqlmode/run.sh @@ -59,28 +59,28 @@ run_sql 'SELECT min(id), max(id) FROM sqlmodedb.t' check_contains 'min(id): 4' check_contains 'max(id): 4' -run_sql 'SELECT count(*) FROM sqlmodedb_lightning_task_info.type_error_v1' +run_sql 'SELECT count(*) FROM sqlmodedb_lightning_task_info.type_error_v2' check_contains 'count(*): 4' -run_sql 'SELECT path, `offset`, error, row_data FROM sqlmodedb_lightning_task_info.type_error_v1 WHERE table_name = "`sqlmodedb`.`t`" AND row_data LIKE "(1,%";' +run_sql 'SELECT path, `offset`, error, row_data FROM sqlmodedb_lightning_task_info.type_error_v2 WHERE table_name = "`sqlmodedb`.`t`" AND row_data LIKE "(1,%";' check_contains 'path: sqlmodedb.t.1.sql' check_contains 'offset: 53' check_contains 'cannot convert datum from unsigned bigint to type timestamp.' check_contains "row_data: (1,9,128,'too long','x,y,z')" -run_sql 'SELECT path, `offset`, error, row_data FROM sqlmodedb_lightning_task_info.type_error_v1 WHERE table_name = "`sqlmodedb`.`t`" AND row_data LIKE "(2,%";' +run_sql 'SELECT path, `offset`, error, row_data FROM sqlmodedb_lightning_task_info.type_error_v2 WHERE table_name = "`sqlmodedb`.`t`" AND row_data LIKE "(2,%";' check_contains 'path: sqlmodedb.t.1.sql' check_contains 'offset: 100' check_contains "Incorrect timestamp value: '2000-00-00 00:00:00'" check_contains "row_data: (2,'2000-00-00 00:00:00',-99999,'🤩',3)" -run_sql 'SELECT path, `offset`, error, row_data FROM sqlmodedb_lightning_task_info.type_error_v1 WHERE table_name = "`sqlmodedb`.`t`" AND row_data LIKE "(3,%";' +run_sql 'SELECT path, `offset`, error, row_data FROM sqlmodedb_lightning_task_info.type_error_v2 WHERE table_name = "`sqlmodedb`.`t`" AND row_data LIKE "(3,%";' check_contains 'path: sqlmodedb.t.1.sql' check_contains 'offset: 149' check_contains "Incorrect timestamp value: '9999-12-31 23:59:59'" check_contains "row_data: (3,'9999-12-31 23:59:59','NaN',x'99','x+y')" -run_sql 'SELECT path, `offset`, error, row_data FROM sqlmodedb_lightning_task_info.type_error_v1 WHERE table_name = "`sqlmodedb`.`t`" AND row_data LIKE "(5,%";' +run_sql 'SELECT path, `offset`, error, row_data FROM sqlmodedb_lightning_task_info.type_error_v2 WHERE table_name = "`sqlmodedb`.`t`" AND row_data LIKE "(5,%";' check_contains 'path: sqlmodedb.t.1.sql' check_contains 'offset: 237' check_contains "Column 'a' cannot be null" diff --git a/br/tidb-lightning.toml b/br/tidb-lightning.toml index 69e03440ecc1e..6335bf1d4db36 100644 --- a/br/tidb-lightning.toml +++ b/br/tidb-lightning.toml @@ -103,7 +103,7 @@ addr = "127.0.0.1:8287" # Current supports three resolution algorithms: # - none: doesn't detect duplicate records, which has the best performance of the three algorithms, but probably leads to # inconsistent data in the target TiDB. -# - record: only records duplicate records to `lightning_task_info.conflict_error_v1` table on the target TiDB. Note that this +# - record: only records duplicate records to `lightning_task_info.conflict_error_v1_2` table on the target TiDB. Note that this # required the version of target TiKV version is no less than v5.2.0, otherwise it will fallback to 'none'. # - remove: records all duplicate records like the 'record' algorithm and remove all duplicate records to ensure a consistent # state in the target TiDB. diff --git a/tools/check/check-bazel-prepare.sh b/tools/check/check-bazel-prepare.sh index 2a8d97ebdc767..21e44e1352cd5 100755 --- a/tools/check/check-bazel-prepare.sh +++ b/tools/check/check-bazel-prepare.sh @@ -19,7 +19,7 @@ # -o pipefail: sets the exit code of a pipeline to that of the rightmost command to exit with a non-zero status, # or to zero if all commands of the pipeline exit successfully. set -euo pipefail - +rm -rf /home/jenkins/.cache/bazel/_bazel_jenkins/install/a09dbb90c658248f08f9aa0eba11997d before_checksum=`find . -type f \( -name '*.bazel' -o -name '*.bzl' \) -exec md5sum {} \;| sort -k 2` make bazel_prepare after_checksum=`find . -type f \( -name '*.bazel' -o -name '*.bzl' \) -exec md5sum {} \;| sort -k 2`