Skip to content

Commit c9cd0b0

Browse files
committed
Add primitive logic to read and group transactions together
Signed-off-by: Florent Poinsard <[email protected]>
1 parent b33e5d1 commit c9cd0b0

14 files changed

+476
-148
lines changed

go/data/query.go

+32-1
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,37 @@ type (
5353
}
5454
)
5555

56+
// ForeachSQLQuery reads a query log file and calls the provided function for each normal SQL query in the log.
57+
// If the query log contains directives, they will be read and queries will be skipped as necessary.
58+
func ForeachSQLQuery(loader IteratorLoader, f func(Query) error) error {
59+
skip := false
60+
for {
61+
query, kontinue := loader.Next()
62+
if !kontinue {
63+
break
64+
}
65+
66+
switch query.Type {
67+
case Skip, Error, VExplain:
68+
skip = true
69+
case Unknown:
70+
return fmt.Errorf("unknown command type: %s", query.Type)
71+
case Comment, CommentWithCommand, EmptyLine, WaitForAuthoritative, SkipIfBelowVersion:
72+
// no-op for keys
73+
case SQLQuery:
74+
if skip {
75+
skip = false
76+
continue
77+
}
78+
if err := f(query); err != nil {
79+
return err
80+
}
81+
}
82+
}
83+
84+
return nil
85+
}
86+
5687
// for a single query, it has some prefix. Prefix mapps to a query type.
5788
// e.g query_vertical maps to typ.Q_QUERY_VERTICAL
5889
func (q *Query) getQueryType(qu string) error {
@@ -64,7 +95,7 @@ func (q *Query) getQueryType(qu string) error {
6495
if q.Type != CommentWithCommand {
6596
// A query that will sent to vitess
6697
q.Query = qu
67-
q.Type = QueryT
98+
q.Type = SQLQuery
6899
} else {
69100
log.WithFields(log.Fields{"line": q.Line, "command": q.FirstWord, "arguments": q.Query}).Error("invalid command")
70101
return fmt.Errorf("invalid command %s", q.FirstWord)

go/data/query_log_parse.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ func (s *mysqlLogReaderState) finalizeQuery() Query {
147147
query := Query{
148148
Query: s.prevQuery,
149149
Line: s.queryStart,
150-
Type: QueryT,
150+
Type: SQLQuery,
151151
ConnectionID: s.prevConnectionID,
152152
}
153153
s.prevQuery = ""
@@ -159,7 +159,7 @@ func (s *mysqlLogReaderState) processQuery(matches []string) Query {
159159
query := Query{
160160
Query: s.prevQuery,
161161
Line: s.queryStart,
162-
Type: QueryT,
162+
Type: SQLQuery,
163163
ConnectionID: s.prevConnectionID,
164164
}
165165
s.prevQuery = ""

go/data/query_log_parse_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,12 @@ func TestSmallSnippet(t *testing.T) {
4141
{
4242
Query: "SET GLOBAL log_output = 'FILE'",
4343
Line: 4,
44-
Type: QueryT,
44+
Type: SQLQuery,
4545
ConnectionID: 32,
4646
}, {
4747
Query: "show databases",
4848
Line: 5,
49-
Type: QueryT,
49+
Type: SQLQuery,
5050
ConnectionID: 32,
5151
}, {
5252
Query: `UPDATE _vt.schema_migrations
@@ -74,7 +74,7 @@ WHERE
7474
)
7575
LIMIT 1`,
7676
Line: 6,
77-
Type: QueryT,
77+
Type: SQLQuery,
7878
ConnectionID: 24,
7979
},
8080
}

go/data/typ.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import "strings"
2121
type CmdType int
2222

2323
const (
24-
QueryT CmdType = iota
24+
SQLQuery CmdType = iota
2525
Error
2626
Skip
2727
Unknown
@@ -37,7 +37,7 @@ const (
3737
)
3838

3939
var commandMap = map[string]CmdType{ //nolint:gochecknoglobals // this is instead of a const
40-
"query": QueryT,
40+
"query": SQLQuery,
4141
"error": Error,
4242
"skip": Skip,
4343
"skip_if_below_version": SkipIfBelowVersion,

go/data/vtgate_log_parse.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ func (s *vtgateLogReaderState) Next() (Query, bool) {
112112
return Query{
113113
Query: query,
114114
Line: s.lineNumber,
115-
Type: QueryT,
115+
Type: SQLQuery,
116116
ConnectionID: connectionID,
117117
}, true
118118
}
@@ -137,7 +137,7 @@ func (s *vtgateLogReaderState) Next() (Query, bool) {
137137
return Query{
138138
Query: parsedQuery,
139139
Line: s.lineNumber,
140-
Type: QueryT,
140+
Type: SQLQuery,
141141
ConnectionID: connectionID,
142142
}, true
143143
}

go/keys/keys.go

+8-25
Original file line numberDiff line numberDiff line change
@@ -77,45 +77,28 @@ func Run(cfg Config) error {
7777
}
7878

7979
func run(out io.Writer, cfg Config) error {
80-
si := &schemaInfo{
81-
tables: make(map[string]columns),
80+
si := &SchemaInfo{
81+
Tables: make(map[string]Columns),
8282
}
8383
ql := &queryList{
8484
queries: make(map[string]*QueryAnalysisResult),
8585
failed: make(map[string]*QueryFailedResult),
8686
}
8787

8888
loader := cfg.Loader.Load(cfg.FileName)
89-
skip := false
90-
for {
91-
query, kontinue := loader.Next()
92-
if !kontinue {
93-
break
94-
}
9589

96-
switch query.Type {
97-
case data.Skip, data.Error, data.VExplain:
98-
skip = true
99-
case data.Unknown:
100-
return fmt.Errorf("unknown command type: %s", query.Type)
101-
case data.Comment, data.CommentWithCommand, data.EmptyLine, data.WaitForAuthoritative, data.SkipIfBelowVersion:
102-
// no-op for keys
103-
case data.QueryT:
104-
if skip {
105-
skip = false
106-
continue
107-
}
108-
process(query, si, ql)
109-
}
110-
}
90+
_ = data.ForeachSQLQuery(loader, func(query data.Query) error {
91+
process(query, si, ql)
92+
return nil
93+
})
11194

11295
closeErr := loader.Close()
11396
jsonWriteErr := ql.writeJSONTo(out)
11497

11598
return errors.Join(closeErr, jsonWriteErr)
11699
}
117100

118-
func process(q data.Query, si *schemaInfo, ql *queryList) {
101+
func process(q data.Query, si *SchemaInfo, ql *queryList) {
119102
ast, bv, err := sqlparser.NewTestParser().Parse2(q.Query)
120103
if err != nil {
121104
ql.addFailedQuery(q, err)
@@ -130,7 +113,7 @@ func process(q data.Query, si *schemaInfo, ql *queryList) {
130113
}
131114
}
132115

133-
func (ql *queryList) processQuery(si *schemaInfo, ast sqlparser.Statement, q data.Query, bv sqlparser.BindVars) {
116+
func (ql *queryList) processQuery(si *SchemaInfo, ast sqlparser.Statement, q data.Query, bv sqlparser.BindVars) {
134117
// handle panics
135118
defer func() {
136119
if r := recover(); r != nil {

go/keys/keys_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,9 @@ func TestKeys(t *testing.T) {
8282
func TestKeysNonAuthoritativeTable(t *testing.T) {
8383
q := data.Query{
8484
Query: "select id from user where id = 20",
85-
Type: data.QueryT,
85+
Type: data.SQLQuery,
8686
}
87-
si := &schemaInfo{}
87+
si := &SchemaInfo{}
8888
ql := &queryList{
8989
queries: make(map[string]*QueryAnalysisResult),
9090
failed: make(map[string]*QueryFailedResult),

go/keys/schemaInfo.go

+20-20
Original file line numberDiff line numberDiff line change
@@ -27,42 +27,42 @@ import (
2727
"vitess.io/vitess/go/vt/vtgate/vindexes"
2828
)
2929

30-
var _ semantics.SchemaInformation = (*schemaInfo)(nil)
30+
var _ semantics.SchemaInformation = (*SchemaInfo)(nil)
3131

3232
type (
33-
schemaInfo struct {
34-
ksName string
35-
tables map[string]columns
33+
SchemaInfo struct {
34+
KsName string
35+
Tables map[string]Columns
3636
}
3737

38-
columns []vindexes.Column
38+
Columns []vindexes.Column
3939
)
4040

41-
func (s *schemaInfo) handleCreateTable(create *sqlparser.CreateTable) {
42-
columns := make(columns, 0, len(create.TableSpec.Columns))
41+
func (s *SchemaInfo) handleCreateTable(create *sqlparser.CreateTable) {
42+
columns := make(Columns, 0, len(create.TableSpec.Columns))
4343
for _, col := range create.TableSpec.Columns {
4444
columns = append(columns, vindexes.Column{
4545
Name: col.Name,
4646
Type: col.Type.SQLType(),
4747
})
4848
}
49-
s.tables[create.Table.Name.String()] = columns
49+
s.Tables[create.Table.Name.String()] = columns
5050
}
5151

52-
func (s *schemaInfo) FindTableOrVindex(tablename sqlparser.TableName) (*vindexes.Table, vindexes.Vindex, string, topodata.TabletType, key.Destination, error) {
52+
func (s *SchemaInfo) FindTableOrVindex(tablename sqlparser.TableName) (*vindexes.Table, vindexes.Vindex, string, topodata.TabletType, key.Destination, error) {
5353
var tbl *vindexes.Table
5454
ks := tablename.Qualifier.String()
5555
if ks == "" {
56-
ks = s.ksName
56+
ks = s.KsName
5757
}
5858

59-
if !tablename.Qualifier.NotEmpty() || tablename.Qualifier.String() == s.ksName {
59+
if !tablename.Qualifier.NotEmpty() || tablename.Qualifier.String() == s.KsName {
6060
// This is a table from our keyspace. We should be able to find it
61-
columns, found := s.tables[tablename.Name.String()]
61+
columns, found := s.Tables[tablename.Name.String()]
6262
if found {
6363
tbl = &vindexes.Table{
6464
Name: tablename.Name,
65-
Keyspace: &vindexes.Keyspace{Name: s.ksName, Sharded: true},
65+
Keyspace: &vindexes.Keyspace{Name: s.KsName, Sharded: true},
6666
Columns: columns,
6767
ColumnListAuthoritative: true,
6868
}
@@ -81,30 +81,30 @@ func (s *schemaInfo) FindTableOrVindex(tablename sqlparser.TableName) (*vindexes
8181
return tbl, nil, ks, topodata.TabletType_REPLICA, nil, nil
8282
}
8383

84-
func (s *schemaInfo) ConnCollation() collations.ID {
84+
func (s *SchemaInfo) ConnCollation() collations.ID {
8585
return collations.CollationBinaryID
8686
}
8787

88-
func (s *schemaInfo) Environment() *vtenv.Environment {
88+
func (s *SchemaInfo) Environment() *vtenv.Environment {
8989
return vtenv.NewTestEnv()
9090
}
9191

92-
func (s *schemaInfo) ForeignKeyMode(string) (vschemapb.Keyspace_ForeignKeyMode, error) {
92+
func (s *SchemaInfo) ForeignKeyMode(string) (vschemapb.Keyspace_ForeignKeyMode, error) {
9393
return vschemapb.Keyspace_unmanaged, nil
9494
}
9595

96-
func (s *schemaInfo) GetForeignKeyChecksState() *bool {
96+
func (s *SchemaInfo) GetForeignKeyChecksState() *bool {
9797
return nil
9898
}
9999

100-
func (s *schemaInfo) KeyspaceError(string) error {
100+
func (s *SchemaInfo) KeyspaceError(string) error {
101101
return nil
102102
}
103103

104-
func (s *schemaInfo) GetAggregateUDFs() []string {
104+
func (s *SchemaInfo) GetAggregateUDFs() []string {
105105
return nil // TODO: maybe this should be a flag?
106106
}
107107

108-
func (s *schemaInfo) FindMirrorRule(sqlparser.TableName) (*vindexes.MirrorRule, error) {
108+
func (s *SchemaInfo) FindMirrorRule(sqlparser.TableName) (*vindexes.MirrorRule, error) {
109109
return nil, nil
110110
}

go/keys/schemaInfo_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import (
2929
func TestSchemaInfo(t *testing.T) {
3030
parser := sqlparser.NewTestParser()
3131

32-
si := &schemaInfo{tables: make(map[string]columns)}
32+
si := &SchemaInfo{Tables: make(map[string]Columns)}
3333

3434
ast, err := parser.Parse(`CREATE TABLE IF NOT EXISTS warehouse (
3535
w_id INT NOT NULL,

go/testdata/small-slow-query-log

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/bin/mysqld, Version: 8.0.26 (Source distribution). started with:
2+
Tcp port: 3306 Unix socket: /tmp/mysql.sock
3+
# Time: 2023-08-01T12:00:01.852861Z
4+
# User@Host: user[user] @ [XXX.XXX.XXX.XXX] Id: 779060
5+
# Query_time: 0.000043 Lock_time: 0.000000 Rows_sent: 0 Rows_examined: 0
6+
SET timestamp=1690891201;
7+
begin;
8+
# Time: 2023-08-01T12:00:01.852861Z
9+
# User@Host: user[user] @ [XXX.XXX.XXX.XXX] Id: 779060
10+
# Query_time: 0.000043 Lock_time: 0.000000 Rows_sent: 0 Rows_examined: 0
11+
SET timestamp=1690891201;
12+
update tblA set apa = 'toto' where foo = 12 and id = 43;
13+
# Time: 2023-08-01T12:00:01.852861Z
14+
# User@Host: user[user] @ [XXX.XXX.XXX.XXX] Id: 779060
15+
# Query_time: 0.000043 Lock_time: 0.000000 Rows_sent: 0 Rows_examined: 0
16+
SET timestamp=1690891201;
17+
update tblB set monkey = 'pippi' where bar = 12 and id = 44;
18+
# Time: 2023-08-01T12:00:01.852861Z
19+
# User@Host: user[user] @ [XXX.XXX.XXX.XXX] Id: 779060
20+
# Query_time: 0.000043 Lock_time: 0.000000 Rows_sent: 0 Rows_examined: 0
21+
SET timestamp=1690891201;
22+
commit;
23+
# Time: 2023-08-01T12:00:01.852861Z
24+
# User@Host: user[user] @ [XXX.XXX.XXX.XXX] Id: 779060
25+
# Query_time: 0.000043 Lock_time: 0.000000 Rows_sent: 0 Rows_examined: 0
26+
SET timestamp=1690891201;
27+
begin;
28+
# Time: 2023-08-01T12:00:01.852861Z
29+
# User@Host: user[user] @ [XXX.XXX.XXX.XXX] Id: 779060
30+
# Query_time: 0.000043 Lock_time: 0.000000 Rows_sent: 0 Rows_examined: 0
31+
SET timestamp=1690891201;
32+
update tblA set apa = 'toto' where foo = 43 and id = 5;
33+
# Time: 2023-08-01T12:00:01.852861Z
34+
# User@Host: user[user] @ [XXX.XXX.XXX.XXX] Id: 779060
35+
# Query_time: 0.000043 Lock_time: 0.000000 Rows_sent: 0 Rows_examined: 0
36+
SET timestamp=1690891201;
37+
update tblB set monkey = 'pippi' where bar = 43 and id = 16;
38+
# Time: 2023-08-01T12:00:01.852861Z
39+
# User@Host: user[user] @ [XXX.XXX.XXX.XXX] Id: 779060
40+
# Query_time: 0.000043 Lock_time: 0.000000 Rows_sent: 0 Rows_examined: 0
41+
SET timestamp=1690891201;
42+
commit;

go/testdata/small-slow-query-log.json

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
[
2+
{
3+
"Queries": [
4+
"update tblA set apa = 'toto' where foo = :1 and id = :2",
5+
"update tblB set monkey = 'pippi' where bar = :1 and id = :3"
6+
],
7+
"Count": 0,
8+
"Predicates": [
9+
"tblA.foo = :1",
10+
"tblA.id = :2",
11+
"tblB.bar = :1",
12+
"tblB.id = :3"
13+
]
14+
},
15+
{
16+
"Queries": [
17+
"update tblA set apa = 'toto' where foo = :1 and id = :2",
18+
"update tblB set monkey = 'pippi' where bar = :1 and id = :3"
19+
],
20+
"Count": 0,
21+
"Predicates": [
22+
"tblA.foo = :1",
23+
"tblA.id = :2",
24+
"tblB.bar = :1",
25+
"tblB.id = :3"
26+
]
27+
}
28+
]

go/tester/tester.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ func (t *Tester) handleQuery(q data.Query) {
189189
t.prepareVExplain(q.Query)
190190
case data.WaitForAuthoritative:
191191
t.waitAuthoritative(q.Query)
192-
case data.QueryT:
192+
case data.SQLQuery:
193193
if t.vexplain == "" {
194194
t.runQuery(q)
195195
return

0 commit comments

Comments
 (0)