Skip to content

Commit 06118b4

Browse files
committed
wip
1 parent b33e5d1 commit 06118b4

9 files changed

+94
-79
lines changed

go/data/query.go

+32-1
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,37 @@ type (
5353
}
5454
)
5555

56+
// ForeachSQLQuery reads a query log file and calls the provided function for each normal SQL query in the log.
57+
// If the query log contains directives, they will be read and queries will be skipped as necessary.
58+
func ForeachSQLQuery(loader IteratorLoader, f func(Query) error) error {
59+
skip := false
60+
for {
61+
query, kontinue := loader.Next()
62+
if !kontinue {
63+
break
64+
}
65+
66+
switch query.Type {
67+
case Skip, Error, VExplain:
68+
skip = true
69+
case Unknown:
70+
return fmt.Errorf("unknown command type: %s", query.Type)
71+
case Comment, CommentWithCommand, EmptyLine, WaitForAuthoritative, SkipIfBelowVersion:
72+
// no-op for keys
73+
case SQLQuery:
74+
if skip {
75+
skip = false
76+
continue
77+
}
78+
if err := f(query); err != nil {
79+
return err
80+
}
81+
}
82+
}
83+
84+
return nil
85+
}
86+
5687
// for a single query, it has some prefix. Prefix mapps to a query type.
5788
// e.g query_vertical maps to typ.Q_QUERY_VERTICAL
5889
func (q *Query) getQueryType(qu string) error {
@@ -64,7 +95,7 @@ func (q *Query) getQueryType(qu string) error {
6495
if q.Type != CommentWithCommand {
6596
// A query that will sent to vitess
6697
q.Query = qu
67-
q.Type = QueryT
98+
q.Type = SQLQuery
6899
} else {
69100
log.WithFields(log.Fields{"line": q.Line, "command": q.FirstWord, "arguments": q.Query}).Error("invalid command")
70101
return fmt.Errorf("invalid command %s", q.FirstWord)

go/data/query_log_parse.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ func (s *mysqlLogReaderState) finalizeQuery() Query {
147147
query := Query{
148148
Query: s.prevQuery,
149149
Line: s.queryStart,
150-
Type: QueryT,
150+
Type: SQLQuery,
151151
ConnectionID: s.prevConnectionID,
152152
}
153153
s.prevQuery = ""
@@ -159,7 +159,7 @@ func (s *mysqlLogReaderState) processQuery(matches []string) Query {
159159
query := Query{
160160
Query: s.prevQuery,
161161
Line: s.queryStart,
162-
Type: QueryT,
162+
Type: SQLQuery,
163163
ConnectionID: s.prevConnectionID,
164164
}
165165
s.prevQuery = ""

go/data/query_log_parse_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,12 @@ func TestSmallSnippet(t *testing.T) {
4141
{
4242
Query: "SET GLOBAL log_output = 'FILE'",
4343
Line: 4,
44-
Type: QueryT,
44+
Type: SQLQuery,
4545
ConnectionID: 32,
4646
}, {
4747
Query: "show databases",
4848
Line: 5,
49-
Type: QueryT,
49+
Type: SQLQuery,
5050
ConnectionID: 32,
5151
}, {
5252
Query: `UPDATE _vt.schema_migrations
@@ -74,7 +74,7 @@ WHERE
7474
)
7575
LIMIT 1`,
7676
Line: 6,
77-
Type: QueryT,
77+
Type: SQLQuery,
7878
ConnectionID: 24,
7979
},
8080
}

go/data/typ.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import "strings"
2121
type CmdType int
2222

2323
const (
24-
QueryT CmdType = iota
24+
SQLQuery CmdType = iota
2525
Error
2626
Skip
2727
Unknown
@@ -37,7 +37,7 @@ const (
3737
)
3838

3939
var commandMap = map[string]CmdType{ //nolint:gochecknoglobals // this is instead of a const
40-
"query": QueryT,
40+
"query": SQLQuery,
4141
"error": Error,
4242
"skip": Skip,
4343
"skip_if_below_version": SkipIfBelowVersion,

go/data/vtgate_log_parse.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ func (s *vtgateLogReaderState) Next() (Query, bool) {
112112
return Query{
113113
Query: query,
114114
Line: s.lineNumber,
115-
Type: QueryT,
115+
Type: SQLQuery,
116116
ConnectionID: connectionID,
117117
}, true
118118
}
@@ -137,7 +137,7 @@ func (s *vtgateLogReaderState) Next() (Query, bool) {
137137
return Query{
138138
Query: parsedQuery,
139139
Line: s.lineNumber,
140-
Type: QueryT,
140+
Type: SQLQuery,
141141
ConnectionID: connectionID,
142142
}, true
143143
}

go/keys/keys.go

+4-21
Original file line numberDiff line numberDiff line change
@@ -86,28 +86,11 @@ func run(out io.Writer, cfg Config) error {
8686
}
8787

8888
loader := cfg.Loader.Load(cfg.FileName)
89-
skip := false
90-
for {
91-
query, kontinue := loader.Next()
92-
if !kontinue {
93-
break
94-
}
9589

96-
switch query.Type {
97-
case data.Skip, data.Error, data.VExplain:
98-
skip = true
99-
case data.Unknown:
100-
return fmt.Errorf("unknown command type: %s", query.Type)
101-
case data.Comment, data.CommentWithCommand, data.EmptyLine, data.WaitForAuthoritative, data.SkipIfBelowVersion:
102-
// no-op for keys
103-
case data.QueryT:
104-
if skip {
105-
skip = false
106-
continue
107-
}
108-
process(query, si, ql)
109-
}
110-
}
90+
_ = data.ForeachSQLQuery(loader, func(query data.Query) error {
91+
process(query, si, ql)
92+
return nil
93+
})
11194

11295
closeErr := loader.Close()
11396
jsonWriteErr := ql.writeJSONTo(out)

go/keys/keys_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func TestKeys(t *testing.T) {
8282
func TestKeysNonAuthoritativeTable(t *testing.T) {
8383
q := data.Query{
8484
Query: "select id from user where id = 20",
85-
Type: data.QueryT,
85+
Type: data.SQLQuery,
8686
}
8787
si := &schemaInfo{}
8888
ql := &queryList{

go/tester/tester.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ func (t *Tester) handleQuery(q data.Query) {
189189
t.prepareVExplain(q.Query)
190190
case data.WaitForAuthoritative:
191191
t.waitAuthoritative(q.Query)
192-
case data.QueryT:
192+
case data.SQLQuery:
193193
if t.vexplain == "" {
194194
t.runQuery(q)
195195
return

go/transactions/transactions.go

+47-46
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package transactions
1818

1919
import (
20+
"io"
2021
"strings"
2122

2223
"github.com/vitessio/vt/go/data"
@@ -40,53 +41,10 @@ type (
4041
)
4142

4243
func Run(cfg Config) {
43-
// Figure out if autocommit is enabled
44-
// If we see:
45-
// 1. BEGIN we can assume autocommit is disabled
46-
// 2. COMMIT and no BEGIN we can assume autocommit is enabled
47-
// 3. ROLLBACK and no BEGIN we can assume autocommit is enabled
48-
// 4. SET autocommit = 1/0
49-
count := 1000
50-
defaultAutocommit := true
51-
loader := cfg.Loader.Load(cfg.FileName)
52-
for {
53-
count--
54-
if count == 0 {
55-
// enough already. we'll assume autocommit is enabled because that is the default
56-
break
57-
}
58-
query, kontinue := loader.Next()
59-
if !kontinue {
60-
break
61-
}
62-
63-
switch query.Type {
64-
case data.Skip, data.Error, data.VExplain, data.Unknown:
65-
panic("unexpected query type")
66-
case data.Comment, data.CommentWithCommand, data.EmptyLine, data.WaitForAuthoritative, data.SkipIfBelowVersion:
67-
// no-op for keys
68-
case data.QueryT:
69-
stmt, err := sqlparser.NewTestParser().Parse(query.Query)
70-
if err != nil {
71-
continue
72-
}
73-
switch stmt.(type) {
74-
case *sqlparser.Begin:
75-
defaultAutocommit = false
76-
break
77-
case *sqlparser.Commit:
78-
break
79-
}
80-
}
81-
}
82-
err := loader.Close()
83-
if err != nil {
84-
panic(err.Error())
85-
}
86-
44+
defaultAutocommit := GetAutocommitGuess(cfg)
8745
transactions := map[int]*Connection{}
8846

89-
loader = cfg.Loader.Load(cfg.FileName)
47+
loader := cfg.Loader.Load(cfg.FileName)
9048
ch := make(chan []data.Query, 1000)
9149

9250
for {
@@ -100,7 +58,7 @@ func Run(cfg Config) {
10058
panic("unexpected query type")
10159
case data.Comment, data.CommentWithCommand, data.EmptyLine, data.WaitForAuthoritative, data.SkipIfBelowVersion:
10260
// no-op for keys
103-
case data.QueryT:
61+
case data.SQLQuery:
10462
stmt, err := sqlparser.NewTestParser().Parse(query.Query)
10563
if err != nil {
10664
continue
@@ -144,3 +102,46 @@ func Run(cfg Config) {
144102
}
145103
}
146104
}
105+
106+
func GetAutocommitGuess(cfg Config) bool {
107+
// Figure out if autocommit is enabled
108+
// If we see:
109+
// 1. BEGIN we can assume autocommit is disabled
110+
// 2. COMMIT and no BEGIN we can assume autocommit is enabled
111+
// 3. ROLLBACK and no BEGIN we can assume autocommit is enabled
112+
// 4. SET autocommit = 1/0
113+
count := 1000
114+
defaultAutocommit := true
115+
loader := cfg.Loader.Load(cfg.FileName)
116+
defer func() {
117+
err := loader.Close()
118+
if err != nil {
119+
panic(err.Error())
120+
}
121+
}()
122+
_ = data.ForeachSQLQuery(loader, func(query data.Query) error {
123+
count--
124+
if count == 0 {
125+
// enough already. we'll assume autocommit is enabled because that is the default
126+
return io.EOF
127+
}
128+
129+
stmt, err := sqlparser.NewTestParser().Parse(query.Query)
130+
if err != nil {
131+
return nil
132+
}
133+
134+
switch stmt.(type) {
135+
case *sqlparser.Begin:
136+
// BEGIN seen, so autocommit is disabled
137+
return io.EOF
138+
case *sqlparser.Commit:
139+
defaultAutocommit = false
140+
// no BEGIN seen, so autocommit is disabled
141+
return io.EOF
142+
}
143+
144+
return nil
145+
})
146+
return defaultAutocommit
147+
}

0 commit comments

Comments
 (0)