Skip to content

Commit c72c2a7

Browse files
committed
br: fix insert gc failed due to slow schema reload (pingcap#57742)
close pingcap#57743 (cherry picked from commit 8fe0618)
1 parent 65fd2ad commit c72c2a7

File tree

7 files changed

+157
-1
lines changed

7 files changed

+157
-1
lines changed

br/pkg/task/stream.go

+34
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,12 @@ const (
7474
"You may check the metadata and continue by wait other task finish or manually delete the lock file " + truncateLockPath + " at the external storage."
7575
)
7676

77+
const (
78+
waitInfoSchemaReloadCheckInterval = 1 * time.Second
79+
// a million tables should take a few minutes to load all DDL change, making 15 to make sure we don't exit early
80+
waitInfoSchemaReloadTimeout = 15 * time.Minute
81+
)
82+
7783
var (
7884
StreamStart = "log start"
7985
StreamStop = "log stop"
@@ -1445,6 +1451,21 @@ func restoreStream(
14451451
return errors.Annotate(err, "failed to restore kv files")
14461452
}
14471453

1454+
// failpoint to stop for a while after restoring kvs
1455+
// this is to mimic the scenario that restore takes long time and the lease in schemaInfo has expired and needs refresh
1456+
failpoint.Inject("post-restore-kv-pending", func(val failpoint.Value) {
1457+
if val.(bool) {
1458+
// not ideal to use sleep but not sure what's the better way right now
1459+
log.Info("sleep after restoring kv")
1460+
time.Sleep(2 * time.Second)
1461+
}
1462+
})
1463+
1464+
// make sure schema reload finishes before proceeding
1465+
if err = waitUntilSchemaReload(ctx, client); err != nil {
1466+
return errors.Trace(err)
1467+
}
1468+
14481469
if err = client.CleanUpKVFiles(ctx); err != nil {
14491470
return errors.Annotate(err, "failed to clean up")
14501471
}
@@ -1869,3 +1890,16 @@ func checkPiTRTaskInfo(
18691890

18701891
return curTaskInfo, doFullRestore, nil
18711892
}
1893+
1894+
func waitUntilSchemaReload(ctx context.Context, client *logclient.LogClient) error {
1895+
log.Info("waiting for schema info finishes reloading")
1896+
reloadStart := time.Now()
1897+
conditionFunc := func() bool {
1898+
return !client.GetDomain().IsLeaseExpired()
1899+
}
1900+
if err := utils.WaitUntil(ctx, conditionFunc, waitInfoSchemaReloadCheckInterval, waitInfoSchemaReloadTimeout); err != nil {
1901+
return errors.Annotate(err, "failed to wait until schema reload")
1902+
}
1903+
log.Info("reloading schema finished", zap.Duration("timeTaken", time.Since(reloadStart)))
1904+
return nil
1905+
}

br/pkg/utils/BUILD.bazel

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ go_library(
1919
"safe_point.go",
2020
"schema.go",
2121
"store_manager.go",
22+
"wait.go",
2223
"worker.go",
2324
],
2425
importpath = "github.com/pingcap/tidb/br/pkg/utils",

br/pkg/utils/wait.go

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
// Copyright 2024 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package utils
16+
17+
import (
18+
"context"
19+
"time"
20+
21+
"github.com/pingcap/errors"
22+
)
23+
24+
func WaitUntil(ctx context.Context, condition func() bool, checkInterval, maxTimeout time.Duration) error {
25+
// do a quick check before starting the ticker
26+
if condition() {
27+
return nil
28+
}
29+
30+
timeoutCtx, cancel := context.WithTimeout(ctx, maxTimeout)
31+
defer cancel()
32+
33+
ticker := time.NewTicker(checkInterval)
34+
defer ticker.Stop()
35+
36+
for {
37+
select {
38+
case <-timeoutCtx.Done():
39+
if ctx.Err() != nil {
40+
return ctx.Err()
41+
}
42+
return errors.Errorf("waitUntil timed out after waiting for %v", maxTimeout)
43+
case <-ticker.C:
44+
if condition() {
45+
return nil
46+
}
47+
}
48+
}
49+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
#!/bin/bash
2+
#
3+
# Copyright 2024 PingCAP, Inc.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
set -eu
18+
. run_services
19+
CUR=$(cd `dirname $0`; pwd)
20+
21+
TASK_NAME="pitr_long_running_schema_loading"
22+
res_file="$TEST_DIR/sql_res.$TEST_NAME.txt"
23+
DB="$TEST_NAME"
24+
25+
restart_services
26+
27+
run_sql "CREATE SCHEMA $DB;"
28+
29+
# start the log backup
30+
run_br --pd $PD_ADDR log start --task-name $TASK_NAME -s "local://$TEST_DIR/$TASK_NAME/log"
31+
32+
run_sql "USE $DB; CREATE TABLE t1 (id INT PRIMARY KEY, value VARCHAR(255));"
33+
run_sql "USE $DB; INSERT INTO t1 VALUES (1, 'before-backup-1'), (2, 'before-backup-2');"
34+
35+
36+
# do a full backup
37+
run_br --pd "$PD_ADDR" backup full -s "local://$TEST_DIR/$TASK_NAME/full"
38+
39+
run_sql "USE $DB; INSERT INTO t1 VALUES (3, 'after-backup-1'), (4, 'after-backup-2');"
40+
run_sql "USE $DB; DROP TABLE t1;"
41+
run_sql "USE $DB; CREATE TABLE t2 (id INT PRIMARY KEY, data TEXT);"
42+
run_sql "USE $DB; INSERT INTO t2 VALUES (1, 'new-table-data');"
43+
44+
echo "wait checkpoint advance"
45+
. "$CUR/../br_test_utils.sh" && wait_log_checkpoint_advance $TASK_NAME
46+
47+
restart_services
48+
49+
export GO_FAILPOINTS="github.com/pingcap/tidb/pkg/domain/mock-load-schema-long-time=return(true);github.com/pingcap/tidb/br/pkg/task/post-restore-kv-pending=return(true)"
50+
run_br --pd "$PD_ADDR" restore point -s "local://$TEST_DIR/$TASK_NAME/log" --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full"
51+
export GO_FAILPOINTS=""

br/tests/run_group_br_tests.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ declare -A groups
2222
groups=(
2323
["G00"]="br_300_small_tables br_backup_empty br_backup_version br_cache_table br_case_sensitive br_charset_gbk br_check_new_collocation_enable br_history br_gcs br_rawkv"
2424
["G01"]="br_autoid br_crypter2 br_db br_db_online br_db_online_newkv br_db_skip br_debug_meta br_ebs br_foreign_key br_full br_table_partition br_full_ddl"
25-
["G02"]="br_full_cluster_restore br_full_index br_incremental_ddl br_pitr_failpoint"
25+
["G02"]="br_full_cluster_restore br_full_index br_incremental_ddl br_pitr_failpoint br_pitr_long_running_schema_loading"
2626
["G03"]='br_incompatible_tidb_config br_incremental br_incremental_index br_incremental_only_ddl br_incremental_same_table br_insert_after_restore br_key_locked br_log_test br_move_backup br_mv_index br_other br_partition_add_index br_tidb_placement_policy br_tiflash'
2727
["G04"]='br_range br_replica_read br_restore_TDE_enable br_restore_log_task_enable br_s3 br_shuffle_leader br_shuffle_region br_single_table'
2828
["G05"]='br_skip_checksum br_small_batch_size br_split_region_fail br_systables br_table_filter br_txn br_stats br_clustered_index br_crypter'

pkg/domain/domain.go

+15
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,16 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
291291
// We can fall back to full load, don't need to return the error.
292292
logutil.BgLogger().Error("failed to load schema diff", zap.Error(err))
293293
}
294+
295+
// add failpoint to simulate long-running schema loading scenario
296+
failpoint.Inject("mock-load-schema-long-time", func(val failpoint.Value) {
297+
if val.(bool) {
298+
// not ideal to use sleep, but not sure if there is a better way
299+
logutil.BgLogger().Error("sleep before doing a full load")
300+
time.Sleep(15 * time.Second)
301+
}
302+
})
303+
294304
// full load.
295305
schemas, err := do.fetchAllSchemasWithTables(m)
296306
if err != nil {
@@ -1328,6 +1338,11 @@ func (do *Domain) Init(
13281338
return nil
13291339
}
13301340

1341+
// IsLeaseExpired returns whether lease has expired
1342+
func (do *Domain) IsLeaseExpired() bool {
1343+
return do.SchemaValidator.IsLeaseExpired()
1344+
}
1345+
13311346
// InitInfo4Test init infosync for distributed execution test.
13321347
func (do *Domain) InitInfo4Test() {
13331348
infosync.MockGlobalServerInfoManagerEntry.Add(do.ddl.GetID(), do.ServerID)

pkg/domain/schema_validator.go

+6
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ type SchemaValidator interface {
5757
Reset()
5858
// IsStarted indicates whether SchemaValidator is started.
5959
IsStarted() bool
60+
// IsLeaseExpired checks whether the current lease has expired
61+
IsLeaseExpired() bool
6062
}
6163

6264
type deltaSchemaInfo struct {
@@ -172,6 +174,10 @@ func (s *schemaValidator) Update(leaseGrantTS uint64, oldVer, currVer int64, cha
172174
}
173175
}
174176

177+
func (s *schemaValidator) IsLeaseExpired() bool {
178+
return time.Now().After(s.latestSchemaExpire)
179+
}
180+
175181
// isRelatedTablesChanged returns the result whether relatedTableIDs is changed
176182
// from usedVer to the latest schema version.
177183
// NOTE, this function should be called under lock!

0 commit comments

Comments
 (0)