Skip to content

Commit

Permalink
planner: fix incorrectly using the schema for plan cache (#57964) (#5…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Dec 12, 2024
1 parent 0b9e7f6 commit 0728f3f
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 10 deletions.
4 changes: 3 additions & 1 deletion pkg/ddl/tests/metadatalock/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ go_test(
"mdl_test.go",
],
flaky = True,
shard_count = 36,
shard_count = 37,
deps = [
"//pkg/config",
"//pkg/ddl",
"//pkg/ddl/ingest/testutil",
"//pkg/ddl/util/callback",
"//pkg/errno",
"//pkg/parser/model",
"//pkg/server",
"//pkg/testkit",
"//pkg/testkit/testsetup",
Expand Down
83 changes: 83 additions & 0 deletions pkg/ddl/tests/metadatalock/mdl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ import (
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/ddl"
ingesttestutil "github.com/pingcap/tidb/pkg/ddl/ingest/testutil"
"github.com/pingcap/tidb/pkg/ddl/util/callback"
mysql "github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/server"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -997,6 +1000,86 @@ func TestMDLPreparePlanCacheExecute2(t *testing.T) {
tk.MustExec("admin check table t")
}

// TestMDLPreparePlanCacheExecuteInsert makes sure the insert statement handle the schema correctly in plan cache.
func TestMDLPreparePlanCacheExecuteInsert(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
defer ingesttestutil.InjectMockBackendMgr(t, store)()

sv := server.CreateMockServer(t, store)

sv.SetDomain(dom)
dom.InfoSyncer().SetSessionManager(sv)
defer sv.Close()

conn1 := server.CreateMockConn(t, sv)
tk := testkit.NewTestKitWithSession(t, store, conn1.Context().Session)
conn2 := server.CreateMockConn(t, sv)
tkDDL := testkit.NewTestKitWithSession(t, store, conn2.Context().Session)
tk.MustExec("use test")
tk.MustExec("set global tidb_enable_metadata_lock=1")
tk.MustExec("create table t(a int primary key, b int);")
tk.MustExec("create table t2(a int);")
tk.MustExec("insert into t values(1, 1), (2, 2), (3, 3), (4, 4);")

tk.MustExec(`prepare insert_stmt from 'insert into t values (?, ?)'`)
tk.MustExec(`set @a=4, @b=4;`)

ch := make(chan struct{})

first := true
callback := &callback.TestDDLCallback{Do: dom}
onJobUpdatedExported := func(job *model.Job) {
switch job.SchemaState {
case model.StateWriteReorganization:
tbl, _ := dom.InfoSchema().TableByID(job.TableID)
idx := tbl.Meta().FindIndexByName("idx")
switch idx.BackfillState {
case model.BackfillStateRunning:
if first {
// generate plan, cache it, and make some row change to make
// sure backfill state 'merging' is not skipped.
tk.MustExec(`begin`)
tk.MustExec(`delete from t where a = 4;`)
tk.MustExec(`execute insert_stmt using @a, @b;`)
tk.MustQuery("select @@last_plan_from_cache;").Check(testkit.Rows("0"))
tk.MustExec(`commit`)

tk.MustExec("begin")
// Activate txn.
tk.MustExec("select * from t2")
first = false
return
}
}
}
}
callback.OnJobUpdatedExported.Store(&onJobUpdatedExported)
dom.DDL().SetHook(callback)

ddl.MockDMLExecutionMerging = func() {
tk.MustExec(`delete from t where a = 4;`)
// we must generate a new plan here, because the schema has changed since
// the last plan was generated.
tk.MustExec(`execute insert_stmt using @a, @b;`)
tk.MustQuery("select @@last_plan_from_cache;").Check(testkit.Rows("0"))
tk.MustExec("commit")
}
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockDMLExecutionMerging", "1*return(true)->return(false)"))

var wg sync.WaitGroup
wg.Add(1)
go func() {
<-ch
tkDDL.MustExec("alter table test.t add index idx(a);")
wg.Done()
}()

ch <- struct{}{}
wg.Wait()

tk.MustExec("admin check table t")
}

func TestMDLDisable2Enable(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
sv := server.CreateMockServer(t, store)
Expand Down
21 changes: 12 additions & 9 deletions pkg/planner/core/plan_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package core
import (
"bytes"
"context"
"math"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/bindinfo"
Expand All @@ -40,8 +41,10 @@ import (
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/collate"
"github.com/pingcap/tidb/pkg/util/kvcache"
"github.com/pingcap/tidb/pkg/util/logutil"
utilpc "github.com/pingcap/tidb/pkg/util/plancache"
"github.com/pingcap/tidb/pkg/util/ranger"
"go.uber.org/zap"
)

// PlanCacheKeyTestIssue43667 is only for test.
Expand Down Expand Up @@ -112,23 +115,23 @@ func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isNonPrep
if err != nil {
return ErrSchemaChanged.GenWithStack("Schema change caused error: %s", err.Error())
}
// Table ID is changed, for example, drop & create table, truncate table.
delete(stmt.RelateVersion, stmt.tbls[i].Meta().ID)
stmt.tbls[i] = tblByName
stmt.RelateVersion[tblByName.Meta().ID] = tblByName.Meta().Revision
tbl = tblByName
}
newTbl, err := tryLockMDLAndUpdateSchemaIfNecessary(sctx, stmt.dbName[i], stmt.tbls[i], is)
// newTbl is the 'should be used' table info for this execution.
newTbl, err := tryLockMDLAndUpdateSchemaIfNecessary(sctx, stmt.dbName[i], tbl, is)
if err != nil {
logutil.BgLogger().Warn("meet error during tryLockMDLAndUpdateSchemaIfNecessary", zap.String("table name", tbl.Meta().Name.String()), zap.Error(err))
// Invalid the cache key related fields to avoid using plan cache.
stmt.RelateVersion[tbl.Meta().ID] = math.MaxUint64
schemaNotMatch = true
continue
}
// The revision of tbl and newTbl may not be the same.
// Example:
// The version of stmt.tbls[i] is taken from the prepare statement and is revision v1.
// When stmt.tbls[i] is locked in MDL, the revision of newTbl is also v1.
// The revision of tbl is v2. The reason may have other statements trigger "tryLockMDLAndUpdateSchemaIfNecessary" before, leading to tbl revision update.
if stmt.tbls[i].Meta().Revision != newTbl.Meta().Revision || (tbl != nil && tbl.Meta().Revision != newTbl.Meta().Revision) {
if stmt.tbls[i].Meta().Revision != newTbl.Meta().Revision {
schemaNotMatch = true
}
// Update the cache key related fields.
stmt.tbls[i] = newTbl
stmt.RelateVersion[newTbl.Meta().ID] = newTbl.Meta().Revision
}
Expand Down

0 comments on commit 0728f3f

Please sign in to comment.