diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 7462ace17616e..8dcc75aff38dc 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -74,6 +74,12 @@ const ( "You may check the metadata and continue by wait other task finish or manually delete the lock file " + truncateLockPath + " at the external storage." ) +const ( + waitInfoSchemaReloadCheckInterval = 1 * time.Second + // a million tables should take a few minutes to load all DDL change, making 15 to make sure we don't exit early + waitInfoSchemaReloadTimeout = 15 * time.Minute +) + var ( StreamStart = "log start" StreamStop = "log stop" @@ -1445,6 +1451,21 @@ func restoreStream( return errors.Annotate(err, "failed to restore kv files") } + // failpoint to stop for a while after restoring kvs + // this is to mimic the scenario that restore takes long time and the lease in schemaInfo has expired and needs refresh + failpoint.Inject("post-restore-kv-pending", func(val failpoint.Value) { + if val.(bool) { + // not ideal to use sleep but not sure what's the better way right now + log.Info("sleep after restoring kv") + time.Sleep(2 * time.Second) + } + }) + + // make sure schema reload finishes before proceeding + if err = waitUntilSchemaReload(ctx, client); err != nil { + return errors.Trace(err) + } + if err = client.CleanUpKVFiles(ctx); err != nil { return errors.Annotate(err, "failed to clean up") } @@ -1869,3 +1890,16 @@ func checkPiTRTaskInfo( return curTaskInfo, doFullRestore, nil } + +func waitUntilSchemaReload(ctx context.Context, client *restore.Client) error { + log.Info("waiting for schema info finishes reloading") + reloadStart := time.Now() + conditionFunc := func() bool { + return !client.GetDomain().IsLeaseExpired() + } + if err := utils.WaitUntil(ctx, conditionFunc, waitInfoSchemaReloadCheckInterval, waitInfoSchemaReloadTimeout); err != nil { + return errors.Annotate(err, "failed to wait until schema reload") + } + log.Info("reloading schema finished", zap.Duration("timeTaken", time.Since(reloadStart))) + return nil +} diff --git a/br/pkg/utils/BUILD.bazel b/br/pkg/utils/BUILD.bazel index d776456ca914b..808f9638e3616 100644 --- a/br/pkg/utils/BUILD.bazel +++ b/br/pkg/utils/BUILD.bazel @@ -19,6 +19,7 @@ go_library( "safe_point.go", "schema.go", "store_manager.go", + "wait.go", "worker.go", ], importpath = "github.com/pingcap/tidb/br/pkg/utils", diff --git a/br/pkg/utils/wait.go b/br/pkg/utils/wait.go new file mode 100644 index 0000000000000..5e5616eafb9eb --- /dev/null +++ b/br/pkg/utils/wait.go @@ -0,0 +1,49 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "context" + "time" + + "github.com/pingcap/errors" +) + +func WaitUntil(ctx context.Context, condition func() bool, checkInterval, maxTimeout time.Duration) error { + // do a quick check before starting the ticker + if condition() { + return nil + } + + timeoutCtx, cancel := context.WithTimeout(ctx, maxTimeout) + defer cancel() + + ticker := time.NewTicker(checkInterval) + defer ticker.Stop() + + for { + select { + case <-timeoutCtx.Done(): + if ctx.Err() != nil { + return ctx.Err() + } + return errors.Errorf("waitUntil timed out after waiting for %v", maxTimeout) + case <-ticker.C: + if condition() { + return nil + } + } + } +} diff --git a/br/tests/br_pitr_long_running_schema_loading/run.sh b/br/tests/br_pitr_long_running_schema_loading/run.sh new file mode 100644 index 0000000000000..e6f32c08bcdce --- /dev/null +++ b/br/tests/br_pitr_long_running_schema_loading/run.sh @@ -0,0 +1,51 @@ +#!/bin/bash +# +# Copyright 2024 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eu +. run_services +CUR=$(cd `dirname $0`; pwd) + +TASK_NAME="pitr_long_running_schema_loading" +res_file="$TEST_DIR/sql_res.$TEST_NAME.txt" +DB="$TEST_NAME" + +restart_services + +run_sql "CREATE SCHEMA $DB;" + +# start the log backup +run_br --pd $PD_ADDR log start --task-name $TASK_NAME -s "local://$TEST_DIR/$TASK_NAME/log" + +run_sql "USE $DB; CREATE TABLE t1 (id INT PRIMARY KEY, value VARCHAR(255));" +run_sql "USE $DB; INSERT INTO t1 VALUES (1, 'before-backup-1'), (2, 'before-backup-2');" + + +# do a full backup +run_br --pd "$PD_ADDR" backup full -s "local://$TEST_DIR/$TASK_NAME/full" + +run_sql "USE $DB; INSERT INTO t1 VALUES (3, 'after-backup-1'), (4, 'after-backup-2');" +run_sql "USE $DB; DROP TABLE t1;" +run_sql "USE $DB; CREATE TABLE t2 (id INT PRIMARY KEY, data TEXT);" +run_sql "USE $DB; INSERT INTO t2 VALUES (1, 'new-table-data');" + +echo "wait checkpoint advance" +. "$CUR/../br_test_utils.sh" && wait_log_checkpoint_advance $TASK_NAME + +restart_services + +export GO_FAILPOINTS="github.com/pingcap/tidb/pkg/domain/mock-load-schema-long-time=return(true);github.com/pingcap/tidb/br/pkg/task/post-restore-kv-pending=return(true)" +run_br --pd "$PD_ADDR" restore point -s "local://$TEST_DIR/$TASK_NAME/log" --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" +export GO_FAILPOINTS="" diff --git a/br/tests/run_group_br_tests.sh b/br/tests/run_group_br_tests.sh index 2379549727f4a..008b0e63d9fbe 100755 --- a/br/tests/run_group_br_tests.sh +++ b/br/tests/run_group_br_tests.sh @@ -22,7 +22,7 @@ declare -A groups groups=( ["G00"]="br_300_small_tables br_backup_empty br_backup_version br_cache_table br_case_sensitive br_charset_gbk br_check_new_collocation_enable br_history br_gcs br_rawkv" ["G01"]="br_autoid br_crypter2 br_db br_db_online br_db_online_newkv br_db_skip br_debug_meta br_ebs br_foreign_key br_full br_table_partition br_full_ddl" - ["G02"]="br_full_cluster_restore br_full_index br_incremental_ddl br_pitr_failpoint" + ["G02"]="br_full_cluster_restore br_full_index br_incremental_ddl br_pitr_failpoint br_pitr_long_running_schema_loading" ["G03"]='br_incompatible_tidb_config br_incremental br_incremental_index br_incremental_only_ddl br_incremental_same_table br_insert_after_restore br_key_locked br_log_test br_move_backup br_mv_index br_other br_partition_add_index br_tidb_placement_policy br_tiflash' ["G04"]='br_range br_replica_read br_restore_TDE_enable br_restore_log_task_enable br_s3 br_shuffle_leader br_shuffle_region br_single_table' ["G05"]='br_skip_checksum br_small_batch_size br_split_region_fail br_systables br_table_filter br_txn br_stats br_clustered_index br_crypter' diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index 17d6e81d18f68..40eafa98a2a64 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -291,6 +291,16 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i // We can fall back to full load, don't need to return the error. logutil.BgLogger().Error("failed to load schema diff", zap.Error(err)) } + + // add failpoint to simulate long-running schema loading scenario + failpoint.Inject("mock-load-schema-long-time", func(val failpoint.Value) { + if val.(bool) { + // not ideal to use sleep, but not sure if there is a better way + logutil.BgLogger().Error("sleep before doing a full load") + time.Sleep(15 * time.Second) + } + }) + // full load. schemas, err := do.fetchAllSchemasWithTables(m) if err != nil { @@ -1328,6 +1338,11 @@ func (do *Domain) Init( return nil } +// IsLeaseExpired returns whether lease has expired +func (do *Domain) IsLeaseExpired() bool { + return do.SchemaValidator.IsLeaseExpired() +} + // InitInfo4Test init infosync for distributed execution test. func (do *Domain) InitInfo4Test() { infosync.MockGlobalServerInfoManagerEntry.Add(do.ddl.GetID(), do.ServerID) diff --git a/pkg/domain/schema_validator.go b/pkg/domain/schema_validator.go index c018085d0141f..0104b11f54b76 100644 --- a/pkg/domain/schema_validator.go +++ b/pkg/domain/schema_validator.go @@ -57,6 +57,8 @@ type SchemaValidator interface { Reset() // IsStarted indicates whether SchemaValidator is started. IsStarted() bool + // IsLeaseExpired checks whether the current lease has expired + IsLeaseExpired() bool } type deltaSchemaInfo struct { @@ -172,6 +174,10 @@ func (s *schemaValidator) Update(leaseGrantTS uint64, oldVer, currVer int64, cha } } +func (s *schemaValidator) IsLeaseExpired() bool { + return time.Now().After(s.latestSchemaExpire) +} + // isRelatedTablesChanged returns the result whether relatedTableIDs is changed // from usedVer to the latest schema version. // NOTE, this function should be called under lock!