Skip to content

Commit 3ea3d67

Browse files
committed
replication: Add mysql::serialization based Gtid Log Event
1 parent 0076e13 commit 3ea3d67

File tree

3 files changed

+346
-0
lines changed

3 files changed

+346
-0
lines changed

replication/event.go

+89
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"unicode"
1313

1414
"github.com/go-mysql-org/go-mysql/mysql"
15+
"github.com/go-mysql-org/go-mysql/serialization"
1516
"github.com/google/uuid"
1617
"github.com/pingcap/errors"
1718
)
@@ -543,6 +544,94 @@ func (e *GTIDEvent) OriginalCommitTime() time.Time {
543544
return microSecTimestampToTime(e.OriginalCommitTimestamp)
544545
}
545546

547+
type GtidTaggedLogEvent struct {
548+
msg serialization.Message
549+
}
550+
551+
func (e *GtidTaggedLogEvent) Decode(data []byte) error {
552+
e.msg = serialization.Message{
553+
Format: serialization.Format{
554+
Fields: []serialization.Field{
555+
{
556+
Name: "gtid_flags",
557+
Type: serialization.FieldIntFixed{
558+
Length: 1,
559+
},
560+
},
561+
{
562+
Name: "uuid",
563+
Type: serialization.FieldIntFixed{
564+
Length: 16,
565+
},
566+
},
567+
{
568+
Name: "gno",
569+
Type: serialization.FieldIntVar{},
570+
},
571+
{
572+
Name: "tag",
573+
Type: serialization.FieldString{},
574+
},
575+
{
576+
Name: "last_committed",
577+
Type: serialization.FieldIntVar{},
578+
},
579+
{
580+
Name: "sequence_number",
581+
Type: serialization.FieldIntVar{},
582+
},
583+
{
584+
Name: "immediate_commit_timestamp",
585+
Type: serialization.FieldIntVar{
586+
Unsigned: true,
587+
},
588+
},
589+
{
590+
Name: "original_commit_timestamp",
591+
Type: serialization.FieldIntVar{
592+
Unsigned: true,
593+
},
594+
Optional: true,
595+
},
596+
{
597+
Name: "transaction_length",
598+
Type: serialization.FieldIntVar{
599+
Unsigned: true,
600+
},
601+
},
602+
{
603+
Name: "immediate_server_version",
604+
Type: serialization.FieldIntVar{
605+
Unsigned: true,
606+
},
607+
},
608+
{
609+
Name: "original_server_version",
610+
Type: serialization.FieldIntVar{
611+
Unsigned: true,
612+
},
613+
Optional: true,
614+
},
615+
{
616+
Name: "commit_group_ticket",
617+
Optional: true,
618+
},
619+
},
620+
},
621+
}
622+
623+
err := serialization.Unmarshal(data, &e.msg)
624+
if err != nil {
625+
return err
626+
}
627+
628+
return nil
629+
}
630+
631+
func (e *GtidTaggedLogEvent) Dump(w io.Writer) {
632+
fmt.Println(e.msg.String())
633+
}
634+
546635
type BeginLoadQueryEvent struct {
547636
FileID uint32
548637
BlockData []byte

replication/parser.go

+2
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,8 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) (
293293
e = &GTIDEvent{}
294294
case ANONYMOUS_GTID_EVENT:
295295
e = &GTIDEvent{}
296+
case GTID_TAGGED_LOG_EVENT:
297+
e = &GtidTaggedLogEvent{}
296298
case BEGIN_LOAD_QUERY_EVENT:
297299
e = &BeginLoadQueryEvent{}
298300
case EXECUTE_LOAD_QUERY_EVENT:

serialization/serialization.go

+255
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
package serialization
2+
3+
import (
4+
"bytes"
5+
"encoding/binary"
6+
"errors"
7+
"fmt"
8+
"io"
9+
"slices"
10+
"strings"
11+
)
12+
13+
// mysql::serialization is a serialization format introduced with tagged GTIDs
14+
//
15+
// https://dev.mysql.com/doc/dev/mysql-server/latest/PageLibsMysqlSerialization.html
16+
17+
type Message struct {
18+
Version uint8 // >= 0
19+
Format Format
20+
}
21+
22+
func (m *Message) String() (text string) {
23+
text += fmt.Sprintf("Message (version: %d)", m.Version)
24+
for _, line := range strings.Split(m.Format.String(), "\n") {
25+
text += "\n " + line
26+
}
27+
return
28+
}
29+
30+
type Format struct {
31+
Size uint64
32+
LastNonIgnorableField int
33+
Fields []Field
34+
}
35+
36+
func (f *Format) String() (text string) {
37+
text += fmt.Sprintf("Format (Size: %d, LastNonIgnorableField: %d)\n", f.Size, f.LastNonIgnorableField)
38+
for _, f := range f.Fields {
39+
text += fmt.Sprintf("Field %02d (Name: %s, Skipped: %t, Type: %T)\n", f.ID, f.Name, f.Skipped, f.Type)
40+
if f.Type != nil {
41+
text += fmt.Sprintf(" Value: %s\n", f.Type.String())
42+
}
43+
}
44+
return text
45+
}
46+
47+
type Field struct {
48+
ID int
49+
Type FieldType
50+
Optional bool
51+
Name string
52+
Skipped bool
53+
}
54+
55+
type FieldType interface {
56+
fmt.Stringer
57+
}
58+
59+
type FieldIntFixed struct {
60+
Length int // Length of value before encoding, encoded value can be more
61+
Value []byte
62+
}
63+
64+
func (f FieldIntFixed) String() string {
65+
if f.Value == nil {
66+
return ""
67+
}
68+
return fmt.Sprintf("0x%x", f.Value)
69+
}
70+
71+
type FieldIntVar struct {
72+
Value uint64
73+
Unsigned bool
74+
}
75+
76+
func (f FieldIntVar) String() string {
77+
return fmt.Sprintf("%d", f.Value)
78+
}
79+
80+
type FieldString struct {
81+
Value string
82+
}
83+
84+
func (f FieldString) String() string {
85+
return string(f.Value)
86+
}
87+
88+
type Marshaler interface {
89+
MarshalMySQLSerial() ([]byte, error)
90+
}
91+
92+
func Unmarshal(data []byte, v interface{}) error {
93+
r := bytes.NewReader(data)
94+
switch m := v.(type) {
95+
case *Message:
96+
messageLen := 1
97+
tmpVer := make([]byte, messageLen)
98+
_, err := r.Read(tmpVer)
99+
if err != nil {
100+
return err
101+
}
102+
m.Version = tmpVer[0] / 2
103+
104+
err = Unmarshal(data[messageLen:], &m.Format)
105+
if err != nil {
106+
return err
107+
}
108+
case *Format:
109+
formatLen := 2
110+
tmpFormat := make([]byte, formatLen)
111+
_, err := r.Read(tmpFormat)
112+
if err != nil {
113+
return err
114+
}
115+
m.Size = uint64(tmpFormat[0] / 2)
116+
m.LastNonIgnorableField = int(tmpFormat[1] / 2)
117+
118+
for i := 0; i < len(m.Fields); i++ {
119+
tmpField := make([]byte, 1)
120+
_, err := r.Read(tmpField)
121+
if err != nil {
122+
if errors.Is(err, io.EOF) {
123+
break
124+
}
125+
return err
126+
}
127+
if int(tmpField[0]/2) != i {
128+
// The field number we got doesn't match what we expect, so a field was skipped.
129+
// Rewind the reader and skip.
130+
m.Fields[i].ID = i
131+
m.Fields[i].Skipped = true
132+
r.Seek(-1, io.SeekCurrent)
133+
continue
134+
}
135+
m.Fields[i].ID = int(tmpField[0] / 2)
136+
switch f := m.Fields[i].Type.(type) {
137+
case FieldIntFixed:
138+
tmpVal, err := decodeFixed(r, f.Length)
139+
if err != nil {
140+
return err
141+
}
142+
f.Value = tmpVal
143+
m.Fields[i].Type = f
144+
case FieldIntVar:
145+
firstByte := make([]byte, 1)
146+
_, err := r.Read(firstByte)
147+
if err != nil {
148+
return err
149+
}
150+
tb := trailingOneBitCount(firstByte[0])
151+
_, err = r.Seek(-1, io.SeekCurrent)
152+
if err != nil {
153+
return err
154+
}
155+
fieldBytes := make([]byte, tb+1)
156+
_, err = r.Read(fieldBytes)
157+
if err != nil {
158+
return err
159+
}
160+
var tNum uint64
161+
switch len(fieldBytes) {
162+
case 1:
163+
tNum = uint64(fieldBytes[0])
164+
case 2:
165+
tNum = uint64(binary.LittleEndian.Uint16(fieldBytes))
166+
case 3:
167+
tNum = uint64(binary.LittleEndian.Uint32(slices.Concat(fieldBytes, []byte{0x0})))
168+
case 4:
169+
tNum = uint64(binary.LittleEndian.Uint32(fieldBytes))
170+
case 5:
171+
tNum = binary.LittleEndian.Uint64(slices.Concat(fieldBytes, []byte{0x0, 0x0, 0x0}))
172+
case 6:
173+
tNum = binary.LittleEndian.Uint64(slices.Concat(fieldBytes, []byte{0x0, 0x0}))
174+
case 7:
175+
tNum = binary.LittleEndian.Uint64(slices.Concat(fieldBytes, []byte{0x0}))
176+
case 8:
177+
tNum = binary.LittleEndian.Uint64(fieldBytes)
178+
}
179+
if f.Unsigned {
180+
f.Value = tNum >> (tb + 2) * 2
181+
} else {
182+
f.Value = tNum >> (tb + 2)
183+
}
184+
m.Fields[i].Type = f
185+
case FieldString:
186+
firstByte := make([]byte, 1)
187+
_, err := r.Read(firstByte)
188+
if err != nil {
189+
return err
190+
}
191+
strBytes := make([]byte, firstByte[0]/2)
192+
_, err = r.Read(strBytes)
193+
if err != nil {
194+
return err
195+
}
196+
f.Value = string(strBytes)
197+
m.Fields[i].Type = f
198+
default:
199+
return fmt.Errorf("unsupported field type: %T", m.Fields[i].Type)
200+
}
201+
}
202+
203+
default:
204+
return fmt.Errorf("unsupported type: %T", v)
205+
}
206+
return nil
207+
}
208+
209+
func decodeFixed(r io.Reader, len int) ([]byte, error) {
210+
var b bytes.Buffer
211+
212+
tmpInt := make([]byte, 1)
213+
for {
214+
_, err := r.Read(tmpInt)
215+
if err != nil {
216+
return nil, err
217+
}
218+
if tmpInt[0]%2 == 0 {
219+
b.WriteByte(tmpInt[0] / 2)
220+
} else {
221+
tmpInt2 := make([]byte, 1)
222+
_, err := r.Read(tmpInt2)
223+
if err != nil {
224+
return nil, err
225+
}
226+
switch tmpInt2[0] {
227+
case 0x2:
228+
b.WriteByte((tmpInt[0] >> 2) + 0x80)
229+
case 0x3:
230+
b.WriteByte((tmpInt[0] >> 2) + 0xc0)
231+
default:
232+
return nil, fmt.Errorf("unknown decoding for %v", tmpInt2[0])
233+
}
234+
}
235+
if b.Len() == len {
236+
break
237+
}
238+
}
239+
return b.Bytes(), nil
240+
}
241+
242+
func trailingOneBitCount(b byte) (count int) {
243+
var i byte = 0x1
244+
for {
245+
if b&i == 0 {
246+
break
247+
}
248+
count++
249+
if i >= 0x80 {
250+
break
251+
}
252+
i = i << 1
253+
}
254+
return
255+
}

0 commit comments

Comments
 (0)