diff --git a/pkg/ddl/tests/metadatalock/BUILD.bazel b/pkg/ddl/tests/metadatalock/BUILD.bazel
index bc8fc8a444520..e0439b0fea460 100644
--- a/pkg/ddl/tests/metadatalock/BUILD.bazel
+++ b/pkg/ddl/tests/metadatalock/BUILD.bazel
@@ -8,16 +8,18 @@ go_test(
         "mdl_test.go",
     ],
     flaky = True,
-    shard_count = 36,
+    shard_count = 37,
     deps = [
         "//pkg/config",
         "//pkg/ddl",
         "//pkg/ddl/ingest/testutil",
         "//pkg/errno",
+        "//pkg/meta/model",
         "//pkg/server",
         "//pkg/testkit",
         "//pkg/testkit/testfailpoint",
         "//pkg/testkit/testsetup",
+        "@com_github_pingcap_failpoint//:failpoint",
         "@com_github_stretchr_testify//require",
         "@org_uber_go_goleak//:goleak",
     ],
diff --git a/pkg/ddl/tests/metadatalock/mdl_test.go b/pkg/ddl/tests/metadatalock/mdl_test.go
index 262cb745e7a63..232f10383d7c6 100644
--- a/pkg/ddl/tests/metadatalock/mdl_test.go
+++ b/pkg/ddl/tests/metadatalock/mdl_test.go
@@ -15,13 +15,17 @@
 package metadatalocktest
 
 import (
+	"context"
 	"fmt"
 	"sync"
 	"testing"
 	"time"
 
+	"github.com/pingcap/failpoint"
+	"github.com/pingcap/tidb/pkg/ddl"
 	ingesttestutil "github.com/pingcap/tidb/pkg/ddl/ingest/testutil"
 	mysql "github.com/pingcap/tidb/pkg/errno"
+	"github.com/pingcap/tidb/pkg/meta/model"
 	"github.com/pingcap/tidb/pkg/server"
 	"github.com/pingcap/tidb/pkg/testkit"
 	"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
@@ -997,6 +1001,103 @@ func TestMDLPreparePlanCacheExecute2(t *testing.T) {
 	tk.MustExec("admin check table t")
 }
 
+// TestMDLPreparePlanCacheExecuteInsert makes sure the insert statement handle the schema correctly in plan cache.
+func TestMDLPreparePlanCacheExecuteInsert(t *testing.T) {
+	store, dom := testkit.CreateMockStoreAndDomain(t)
+	defer ingesttestutil.InjectMockBackendMgr(t, store)()
+
+	sv := server.CreateMockServer(t, store)
+
+	sv.SetDomain(dom)
+	dom.InfoSyncer().SetSessionManager(sv)
+	defer sv.Close()
+
+	conn1 := server.CreateMockConn(t, sv)
+	tk := testkit.NewTestKitWithSession(t, store, conn1.Context().Session)
+	conn2 := server.CreateMockConn(t, sv)
+	tkDDL := testkit.NewTestKitWithSession(t, store, conn2.Context().Session)
+	conn3 := server.CreateMockConn(t, sv)
+	tk3 := testkit.NewTestKitWithSession(t, store, conn3.Context().Session)
+	tk.MustExec("use test")
+	tk.MustExec("set global tidb_enable_metadata_lock=1")
+	tk.MustExec("create table t(a int primary key, b int);")
+	tk.MustExec("create table t2(a int);")
+	tk.MustExec("insert into t values(1, 1), (2, 2), (3, 3), (4, 4);")
+
+	tk.MustExec(`begin`)
+	tk.MustExec(`prepare delete_stmt from 'delete from t where a = ?'`)
+	tk.MustExec(`prepare insert_stmt from 'insert into t values (?, ?)'`)
+	tk.MustExec(`commit`)
+
+	tk.MustExec(`begin`)
+	tk.MustExec(`set @a = 4, @b= 4;`)
+	tk.MustExec(`execute delete_stmt using @a;`)
+	tk.MustExec(`execute insert_stmt using @a, @b;`)
+	tk.MustExec(`commit`)
+
+	tk.MustExec("begin")
+
+	ch := make(chan struct{})
+
+	first := true
+	testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/onJobUpdated", func(job *model.Job) {
+		switch job.SchemaState {
+		case model.StateWriteReorganization:
+			tbl, _ := dom.InfoSchema().TableByID(context.Background(), job.TableID)
+			idx := tbl.Meta().FindIndexByName("idx")
+			switch idx.BackfillState {
+			case model.BackfillStateRunning:
+				if first {
+					tk.MustExec(`begin`)
+					tk.MustExec(`set @a=9;`)
+					tk.MustExec(`execute delete_stmt using @a;`)
+					tk.MustQuery("select @@last_plan_from_cache;").Check(testkit.Rows("0"))
+					tk.MustExec(`set @a=6, @b=4;`)
+					tk.MustExec(`execute insert_stmt using @a, @b;`)
+					tk.MustQuery("select @@last_plan_from_cache;").Check(testkit.Rows("0"))
+					tk.MustExec(`commit`)
+					tk.MustExec(`begin`)
+					tk.MustExec(`set @a=4;`)
+					tk.MustExec(`execute delete_stmt using @a;`)
+					tk.MustQuery("select @@last_plan_from_cache;").Check(testkit.Rows("1"))
+					tk.MustExec(`set @a=4, @b=4;`)
+					tk.MustExec(`execute insert_stmt using @a, @b;`)
+					tk.MustQuery("select @@last_plan_from_cache;").Check(testkit.Rows("0"))
+					tk.MustExec(`commit`)
+
+					tk.MustExec("begin")
+					// Activate txn.
+					tk.MustExec("select * from t2")
+					first = false
+					tk3.MustExec("insert into test.t values(10000, 1000)")
+					return
+				}
+			}
+		}
+	})
+
+	ddl.MockDMLExecutionMerging = func() {
+		tk.MustExec(`execute delete_stmt using @a;`)
+		tk.MustQuery("select @@last_plan_from_cache;").Check(testkit.Rows("0"))
+		tk.MustExec(`execute insert_stmt using @a, @b;`)
+		tk.MustExec("commit")
+	}
+	require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockDMLExecutionMerging", "1*return(true)->return(false)"))
+
+	var wg sync.WaitGroup
+	wg.Add(1)
+	go func() {
+		<-ch
+		tkDDL.MustExec("alter table test.t add index idx(a);")
+		wg.Done()
+	}()
+
+	ch <- struct{}{}
+	wg.Wait()
+
+	tk.MustExec("admin check table t")
+}
+
 func TestMDLDisable2Enable(t *testing.T) {
 	store, dom := testkit.CreateMockStoreAndDomain(t)
 	sv := server.CreateMockServer(t, store)
diff --git a/pkg/planner/core/plan_cache.go b/pkg/planner/core/plan_cache.go
index 1f948b42c1d41..a9091d51b47dd 100644
--- a/pkg/planner/core/plan_cache.go
+++ b/pkg/planner/core/plan_cache.go
@@ -16,6 +16,7 @@ package core
 
 import (
 	"context"
+	"math"
 	"time"
 
 	"github.com/pingcap/errors"
@@ -39,6 +40,8 @@ import (
 	"github.com/pingcap/tidb/pkg/util/dbterror/plannererrors"
 	"github.com/pingcap/tidb/pkg/util/hint"
 	"github.com/pingcap/tidb/pkg/util/intest"
+	"github.com/pingcap/tidb/pkg/util/logutil"
+	"go.uber.org/zap"
 )
 
 // PlanCacheKeyTestIssue43667 is only for test.
@@ -115,23 +118,23 @@ func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isNonPrep
 			if err != nil {
 				return plannererrors.ErrSchemaChanged.GenWithStack("Schema change caused error: %s", err.Error())
 			}
+			// Table ID is changed, for example, drop & create table, truncate table.
 			delete(stmt.RelateVersion, stmt.tbls[i].Meta().ID)
-			stmt.tbls[i] = tblByName
-			stmt.RelateVersion[tblByName.Meta().ID] = tblByName.Meta().Revision
+			tbl = tblByName
 		}
-		newTbl, err := tryLockMDLAndUpdateSchemaIfNecessary(ctx, sctx.GetPlanCtx(), stmt.dbName[i], stmt.tbls[i], is)
+		// newTbl is the 'should be used' table info for this execution.
+		newTbl, err := tryLockMDLAndUpdateSchemaIfNecessary(ctx, sctx.GetPlanCtx(), stmt.dbName[i], tbl, is)
 		if err != nil {
+			logutil.BgLogger().Warn("meet error during tryLockMDLAndUpdateSchemaIfNecessary", zap.String("table name", tbl.Meta().Name.String()), zap.Error(err))
+			// Invalid the cache key related fields to avoid using plan cache.
+			stmt.RelateVersion[tbl.Meta().ID] = math.MaxUint64
 			schemaNotMatch = true
 			continue
 		}
-		// The revision of tbl and newTbl may not be the same.
-		// Example:
-		// The version of stmt.tbls[i] is taken from the prepare statement and is revision v1.
-		// When stmt.tbls[i] is locked in MDL, the revision of newTbl is also v1.
-		// The revision of tbl is v2. The reason may have other statements trigger "tryLockMDLAndUpdateSchemaIfNecessary" before, leading to tbl revision update.
-		if stmt.tbls[i].Meta().Revision != newTbl.Meta().Revision || (tbl != nil && tbl.Meta().Revision != newTbl.Meta().Revision) {
+		if stmt.tbls[i].Meta().Revision != newTbl.Meta().Revision {
 			schemaNotMatch = true
 		}
+		// Update the cache key related fields.
 		stmt.tbls[i] = newTbl
 		stmt.RelateVersion[newTbl.Meta().ID] = newTbl.Meta().Revision
 	}