Skip to content

Commit e974ee8

Browse files
Add logic for reading query log, batching queries related to a transaction, handle autocommit default/explicit setting. Queries sent to a channgel. No tests
Signed-off-by: Rohit Nayak <[email protected]>
1 parent 8ce83b7 commit e974ee8

File tree

1 file changed

+59
-7
lines changed

1 file changed

+59
-7
lines changed

go/transactions/transactions.go

+59-7
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@ limitations under the License.
1717
package transactions
1818

1919
import (
20+
"strings"
21+
2022
"github.com/vitessio/vt/go/data"
23+
2124
"vitess.io/vitess/go/vt/sqlparser"
2225
)
2326

@@ -28,16 +31,12 @@ type (
2831
}
2932

3033
Connection struct {
31-
// The connection ID
32-
ID int
33-
34-
buf []data.Query
34+
Transaction []data.Query
3535

3636
Autocommit bool
3737
}
3838

39-
TxSignature struct {
40-
}
39+
TxSignature struct{}
4140
)
4241

4342
func Run(cfg Config) {
@@ -85,10 +84,63 @@ func Run(cfg Config) {
8584
panic(err.Error())
8685
}
8786

88-
connections := map[int]*Connection{}
87+
transactions := map[int]*Connection{}
8988

9089
loader = cfg.Loader.Load(cfg.FileName)
90+
ch := make(chan []data.Query, 1000)
91+
9192
for {
93+
query, kontinue := loader.Next()
94+
if !kontinue {
95+
break
96+
}
9297

98+
switch query.Type {
99+
case data.Skip, data.Error, data.VExplain, data.Unknown:
100+
panic("unexpected query type")
101+
case data.Comment, data.CommentWithCommand, data.EmptyLine, data.WaitForAuthoritative, data.SkipIfBelowVersion:
102+
// no-op for keys
103+
case data.QueryT:
104+
stmt, err := sqlparser.NewTestParser().Parse(query.Query)
105+
if err != nil {
106+
continue
107+
}
108+
switch stmt.(type) {
109+
case *sqlparser.Begin:
110+
case *sqlparser.Commit:
111+
connection := transactions[query.ConnectionID]
112+
ch <- connection.Transaction
113+
connection.Transaction = nil
114+
case *sqlparser.Set:
115+
set := stmt.(*sqlparser.Set)
116+
for _, expr := range set.Exprs {
117+
if expr.Var.Name.Lowered() == "autocommit" {
118+
val, ok := expr.Expr.(*sqlparser.Literal)
119+
if !ok {
120+
continue
121+
}
122+
val2 := strings.ToLower(val.Val)
123+
if val2 == "1" || val2 == "on" || val2 == "true" {
124+
transactions[query.ConnectionID].Autocommit = true
125+
} else {
126+
transactions[query.ConnectionID].Autocommit = false
127+
}
128+
}
129+
}
130+
default:
131+
if sqlparser.IsDMLStatement(stmt) {
132+
connection, ok := transactions[query.ConnectionID]
133+
if !ok {
134+
connection = &Connection{Autocommit: defaultAutocommit}
135+
transactions[query.ConnectionID] = connection
136+
}
137+
if connection.Autocommit {
138+
ch <- []data.Query{query}
139+
} else {
140+
connection.Transaction = append(connection.Transaction, query)
141+
}
142+
}
143+
}
144+
}
93145
}
94146
}

0 commit comments

Comments
 (0)