Skip to content

Commit 030f025

Browse files
committed
feat: clean up signatures
Signed-off-by: Andres Taylor <[email protected]>
1 parent f22056c commit 030f025

File tree

3 files changed

+81
-22
lines changed

3 files changed

+81
-22
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
[
22
{
3-
"Queries": [
4-
"update tblA set apa = :v where foo = :0 and id = :1",
5-
"update tblB set monkey = :v where bar = :0 and id = :2"
3+
"query-signatures": [
4+
"update tblA where foo = :0 and id = :1 set apa",
5+
"update tblB where bar = :0 and id = :2 set monkey"
66
],
7-
"Predicates": [
7+
"predicates": [
88
"tblA.foo = 0",
9-
"tblA.id = 1",
9+
"tblA.id = ?",
1010
"tblB.bar = 0",
11-
"tblB.id = 2"
11+
"tblB.id = ?"
1212
],
13-
"Count": 2
13+
"count": 2
1414
}
1515
]

go/transactions/transaction_signature.go

+46-5
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"fmt"
2323
"hash/fnv"
2424
"sort"
25+
"strconv"
2526

2627
"vitess.io/vitess/go/vt/sqlparser"
2728
)
@@ -46,7 +47,11 @@ type (
4647
)
4748

4849
func (pi predicateInfo) String() string {
49-
return fmt.Sprintf("%s.%s %s %d", pi.Table, pi.Col, pi.Op.ToString(), pi.Val)
50+
val := strconv.Itoa(pi.Val)
51+
if pi.Val == -1 {
52+
val = "?"
53+
}
54+
return fmt.Sprintf("%s.%s %s %s", pi.Table, pi.Col, pi.Op.ToString(), val)
5055
}
5156

5257
func (pi predicateInfo) compareTo(b predicateInfo) int {
@@ -69,9 +74,9 @@ func (tx *TxSignature) MarshalJSON() ([]byte, error) {
6974
}
7075

7176
return json.Marshal(struct {
72-
Queries []string
73-
Predicates []string
74-
Count int
77+
Queries []string `json:"query-signatures"`
78+
Predicates []string `json:"predicates"`
79+
Count int `json:"count"`
7580
}{
7681
Queries: tx.Queries,
7782
Predicates: predicateStrings,
@@ -164,13 +169,49 @@ func (tx *TxSignature) Equals(other *TxSignature) bool {
164169
return true
165170
}
166171

172+
// CleanUp removes values that are only used once and replaces them with -1
173+
func (tx *TxSignature) CleanUp() *TxSignature {
174+
newPredicates := make([]predicateInfo, 0, len(tx.Predicates))
175+
usedValues := make(map[int]int)
176+
177+
// First let's count how many times each value is used
178+
for _, pred := range tx.Predicates {
179+
usedValues[pred.Val]++
180+
}
181+
182+
// Now we replace values only used once with -1
183+
newCount := 0
184+
newValues := make(map[int]int)
185+
for _, pred := range tx.Predicates {
186+
if usedValues[pred.Val] == 1 {
187+
pred.Val = -1
188+
} else {
189+
newVal, found := newValues[pred.Val]
190+
if !found {
191+
// Assign a new value to this predicate
192+
newVal = newCount
193+
newCount++
194+
newValues[pred.Val] = newVal
195+
}
196+
pred.Val = newVal
197+
}
198+
newPredicates = append(newPredicates, pred)
199+
}
200+
201+
return &TxSignature{
202+
Queries: tx.Queries,
203+
Predicates: newPredicates,
204+
Count: tx.Count,
205+
}
206+
}
207+
167208
func (m *txSignatureMap) MarshalJSON() ([]byte, error) {
168209
// Collect all interesting TxSignatures into a slice
169210
var signatures []*TxSignature
170211
for _, bucket := range m.data {
171212
for _, txSig := range bucket {
172213
if txSig.Count > 1 {
173-
signatures = append(signatures, txSig)
214+
signatures = append(signatures, txSig.CleanUp())
174215
}
175216
}
176217
}

go/transactions/transactions.go

+28-10
Original file line numberDiff line numberDiff line change
@@ -223,21 +223,39 @@ func (s *state) consume(ch <-chan []sqlparser.Statement, wg *sync.WaitGroup) {
223223
}
224224
}
225225

226-
func (s *state) consumeUpdate(query *sqlparser.Update, st *semantics.SemTable, n *normalizer, tx *TxSignature) {
227-
defer func() {
228-
tx.Queries = append(tx.Queries, sqlparser.String(query))
229-
}()
226+
func querySignatureForUpd(query *sqlparser.Update) string {
227+
buffer := sqlparser.NewTrackedBuffer(nil)
228+
builder := buffer.Builder
229+
builder.WriteString("update ")
230+
231+
for i, tbl := range query.TableExprs {
232+
tbl.Format(buffer)
233+
if i < len(query.TableExprs)-1 {
234+
buffer.WriteString(", ")
235+
}
236+
}
237+
238+
if query.Where != nil {
239+
query.Where.Format(buffer)
240+
}
241+
242+
builder.WriteString(" set ")
230243

231-
// Normalize the AST our own way:
232-
// - Replace the value in SET by "v"
233-
// - Replace the literals found in where clause comparisons by the corresponding ID we got earlier
234244
for i, expr := range query.Exprs {
235-
query.Exprs[i] = &sqlparser.UpdateExpr{
236-
Name: expr.Name,
237-
Expr: sqlparser.NewArgument("v"),
245+
expr.Name.Format(buffer)
246+
if i < len(query.Exprs)-1 {
247+
buffer.WriteString(", ")
238248
}
239249
}
240250

251+
return builder.String()
252+
}
253+
254+
func (s *state) consumeUpdate(query *sqlparser.Update, st *semantics.SemTable, n *normalizer, tx *TxSignature) {
255+
defer func() {
256+
tx.Queries = append(tx.Queries, querySignatureForUpd(query))
257+
}()
258+
241259
if query.Where == nil {
242260
return
243261
}

0 commit comments

Comments
 (0)