Skip to content

Commit

Permalink
Add linearizable support to SQL VSchema management (#17401)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored Jan 23, 2025
1 parent 1df1dc8 commit a6c2896
Show file tree
Hide file tree
Showing 39 changed files with 1,047 additions and 561 deletions.
11 changes: 7 additions & 4 deletions go/cmd/vtcombo/cli/vschema_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,20 @@ func loadKeyspacesFromDir(ctx context.Context, dir string, ts *topo.Server) {
log.Fatalf("Unable to read keyspace file %v: %v", ksFile, err)
}

keyspace := &vschemapb.Keyspace{}
err = json.Unmarshal(jsonData, keyspace)
ksvs := &topo.KeyspaceVSchemaInfo{
Name: ks.Name,
Keyspace: &vschemapb.Keyspace{},
}
err = json.Unmarshal(jsonData, ksvs.Keyspace)
if err != nil {
log.Fatalf("Unable to parse keyspace file %v: %v", ksFile, err)
}

_, err = vindexes.BuildKeyspace(keyspace, env.Parser())
_, err = vindexes.BuildKeyspace(ksvs.Keyspace, env.Parser())
if err != nil {
log.Fatalf("Invalid keyspace definition: %v", err)
}
ts.SaveVSchema(ctx, ks.Name, keyspace)
ts.SaveVSchema(ctx, ksvs)
log.Infof("Loaded keyspace %v from %v\n", ks.Name, ksFile)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ var (
PRIMARY KEY (id)
) Engine=InnoDB;`
vschemaDDL = "alter vschema create vindex test_vdx using hash"
vschemaDDLError = fmt.Sprintf("Error 1105 (HY000): cannot perform Update on keyspaces/%s/VSchema as the topology server connection is read-only",
keyspaceUnshardedName)
vschemaDDLError = "Error 1105 (HY000): cannot update VSchema as the topology server connection is read-only"
)

// createConfig creates a config file in TmpDir in vtdataroot and writes the given data.
Expand Down
81 changes: 80 additions & 1 deletion go/test/endtoend/vtgate/vschema/vschema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,20 @@ import (
"fmt"
"os"
"path"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/exp/rand"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/utils"
"vitess.io/vitess/go/vt/vtgate"
)

var (
Expand Down Expand Up @@ -164,14 +169,88 @@ func TestVSchema(t *testing.T) {
utils.AssertMatches(t, conn, "delete from vt_user", `[]`)

if utils.BinaryIsAtLeastAtVersion(22, "vtgate") {
// Don't allow any users to modify the vschema via the SQL API
// in order to test that behavior.
writeConfig(configFile, map[string]string{
"vschema_ddl_authorized_users": "",
})

// Allow anyone to modify the vschema via the SQL API again when
// the test completes.
defer func() {
writeConfig(configFile, map[string]string{
"vschema_ddl_authorized_users": "%",
})
}()
require.EventuallyWithT(t, func(t *assert.CollectT) {
_, err = conn.ExecuteFetch("ALTER VSCHEMA DROP TABLE main", 1000, false)
assert.Error(t, err)
assert.ErrorContains(t, err, "is not authorized to perform vschema operations")
}, 5*time.Second, 100*time.Millisecond)
}
}

// TestVSchemaSQLAPIConcurrency tests that we prevent lost writes when we have
// concurrent vschema changes being made via the SQL API.
func TestVSchemaSQLAPIConcurrency(t *testing.T) {
if !utils.BinaryIsAtLeastAtVersion(22, "vtgate") {
t.Skip("This test requires vtgate version 22 or higher")
}

ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second)
defer cancel()
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer conn.Close()

initialVSchema, err := conn.ExecuteFetch("SHOW VSCHEMA TABLES", -1, false)
require.NoError(t, err)
baseTableName := "t"
numTables := 1000
mysqlConns := make([]*mysql.Conn, numTables)
for i := 0; i < numTables; i++ {
c, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
mysqlConns[i] = c
defer c.Close()
}

isVersionMismatchErr := func(err error) bool {
// The error we get is an SQL error so we have to do string matching.
return err != nil && strings.Contains(err.Error(), vtgate.ErrStaleVSchema.Error())
}

wg := sync.WaitGroup{}
preventedLostWrites := atomic.Bool{}
for i := 0; i < numTables; i++ {
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(time.Duration(rand.Intn(100) * int(time.Nanosecond)))
tableName := fmt.Sprintf("%s%d", baseTableName, i)
_, err = mysqlConns[i].ExecuteFetch(fmt.Sprintf("ALTER VSCHEMA ADD TABLE %s", tableName), -1, false)
if isVersionMismatchErr(err) {
preventedLostWrites.Store(true)
} else {
require.NoError(t, err)
time.Sleep(time.Duration(rand.Intn(75) * int(time.Nanosecond)))
_, err = mysqlConns[i].ExecuteFetch(fmt.Sprintf("ALTER VSCHEMA DROP TABLE %s", tableName), -1, false)
if isVersionMismatchErr(err) {
preventedLostWrites.Store(true)
} else {
require.NoError(t, err)
}
}
}()
}
wg.Wait()
require.True(t, preventedLostWrites.Load())

// Cleanup any tables that were not dropped because the DROP query
// failed due to a bad node version.
for i := 0; i < numTables; i++ {
tableName := fmt.Sprintf("%s%d", baseTableName, i)
_, _ = mysqlConns[i].ExecuteFetch(fmt.Sprintf("ALTER VSCHEMA DROP TABLE %s", tableName), -1, false)
}
// Confirm that we're back to the initial state.
utils.AssertMatches(t, conn, "SHOW VSCHEMA TABLES", fmt.Sprintf("%v", initialVSchema.Rows))
}
2 changes: 1 addition & 1 deletion go/test/utils/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func MustMatchFn(ignoredFields ...string) func(t *testing.T, want, got any, errM
t.Helper()
diff := cmp.Diff(want, got, diffOpts...)
if diff != "" {
t.Fatalf("%v: (-want +got)\n%v", errMsg, diff)
require.FailNow(t, "%v: (-want +got)\n%v", errMsg, diff)
}
}
}
Expand Down
7 changes: 3 additions & 4 deletions go/vt/topo/helpers/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ func CopyKeyspaces(ctx context.Context, fromTS, toTS *topo.Server, parser *sqlpa
}

for _, keyspace := range keyspaces {

ki, err := fromTS.GetKeyspace(ctx, keyspace)
if err != nil {
return fmt.Errorf("GetKeyspace(%v): %w", keyspace, err)
Expand All @@ -55,15 +54,15 @@ func CopyKeyspaces(ctx context.Context, fromTS, toTS *topo.Server, parser *sqlpa
}
}

vs, err := fromTS.GetVSchema(ctx, keyspace)
ksvs, err := fromTS.GetVSchema(ctx, keyspace)
switch {
case err == nil:
_, err = vindexes.BuildKeyspace(vs, parser)
_, err = vindexes.BuildKeyspace(ksvs.Keyspace, parser)
if err != nil {
log.Errorf("BuildKeyspace(%v): %v", keyspace, err)
break
}
if err := toTS.SaveVSchema(ctx, keyspace, vs); err != nil {
if err := toTS.SaveVSchema(ctx, ksvs); err != nil {
log.Errorf("SaveVSchema(%v): %v", keyspace, err)
}
case topo.IsErrType(err, topo.NoNode):
Expand Down
9 changes: 6 additions & 3 deletions go/vt/topo/srv_vschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,13 @@ func (ts *Server) RebuildSrvVSchema(ctx context.Context, cells []string) error {
go func(keyspace string) {
defer wg.Done()

k, err := ts.GetVSchema(ctx, keyspace)
ksvs, err := ts.GetVSchema(ctx, keyspace)
if IsErrType(err, NoNode) {
err = nil
k = &vschemapb.Keyspace{}
ksvs = &KeyspaceVSchemaInfo{
Name: keyspace,
Keyspace: &vschemapb.Keyspace{},
}
}

mu.Lock()
Expand All @@ -184,7 +187,7 @@ func (ts *Server) RebuildSrvVSchema(ctx context.Context, cells []string) error {
finalErr = err
return
}
srvVSchema.Keyspaces[keyspace] = k
srvVSchema.Keyspaces[keyspace] = ksvs.Keyspace
}(keyspace)
}
wg.Wait()
Expand Down
16 changes: 11 additions & 5 deletions go/vt/topo/test/vschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,12 @@ func checkVSchema(t *testing.T, ctx context.Context, ts *topo.Server) {
t.Error(err)
}

err = ts.SaveVSchema(ctx, "test_keyspace", &vschemapb.Keyspace{
Tables: map[string]*vschemapb.Table{
"unsharded": {},
err = ts.SaveVSchema(ctx, &topo.KeyspaceVSchemaInfo{
Name: "test_keyspace",
Keyspace: &vschemapb.Keyspace{
Tables: map[string]*vschemapb.Table{
"unsharded": {},
},
},
})
if err != nil {
Expand All @@ -64,8 +67,11 @@ func checkVSchema(t *testing.T, ctx context.Context, ts *topo.Server) {
t.Errorf("GetVSchema: %s, want %s", got, want)
}

err = ts.SaveVSchema(ctx, "test_keyspace", &vschemapb.Keyspace{
Sharded: true,
err = ts.SaveVSchema(ctx, &topo.KeyspaceVSchemaInfo{
Name: "test_keyspace",
Keyspace: &vschemapb.Keyspace{
Sharded: true,
},
})
require.NoError(t, err)

Expand Down
16 changes: 13 additions & 3 deletions go/vt/topo/topotests/srv_vschema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
Expand Down Expand Up @@ -76,7 +77,10 @@ func TestRebuildVSchema(t *testing.T) {
keyspace1 := &vschemapb.Keyspace{
Sharded: true,
}
if err := ts.SaveVSchema(ctx, "ks1", keyspace1); err != nil {
if err := ts.SaveVSchema(ctx, &topo.KeyspaceVSchemaInfo{
Name: "ks1",
Keyspace: keyspace1,
}); err != nil {
t.Fatalf("SaveVSchema(ks1) failed: %v", err)
}
if err := ts.RebuildSrvVSchema(ctx, cells); err != nil {
Expand Down Expand Up @@ -118,7 +122,10 @@ func TestRebuildVSchema(t *testing.T) {
},
},
}
if err := ts.SaveVSchema(ctx, "ks2", keyspace2); err != nil {
if err := ts.SaveVSchema(ctx, &topo.KeyspaceVSchemaInfo{
Name: "ks2",
Keyspace: keyspace2,
}); err != nil {
t.Fatalf("SaveVSchema(ks1) failed: %v", err)
}
if err := ts.RebuildSrvVSchema(ctx, []string{"cell1"}); err != nil {
Expand Down Expand Up @@ -182,7 +189,10 @@ func TestRebuildVSchema(t *testing.T) {
wanted4.RoutingRules = rr

// Delete a keyspace, checks vschema entry in map goes away.
if err := ts.SaveVSchema(ctx, "ks2", &vschemapb.Keyspace{}); err != nil {
if err := ts.SaveVSchema(ctx, &topo.KeyspaceVSchemaInfo{
Name: "ks2",
Keyspace: &vschemapb.Keyspace{},
}); err != nil {
t.Fatalf("SaveVSchema(ks1) failed: %v", err)
}
if err := ts.DeleteKeyspace(ctx, "ks2"); err != nil {
Expand Down
Loading

0 comments on commit a6c2896

Please sign in to comment.