diff --git a/replication/event.go b/replication/event.go index 6fb614f61..c151f32ba 100644 --- a/replication/event.go +++ b/replication/event.go @@ -229,34 +229,91 @@ type PreviousGTIDsEvent struct { GTIDSets string } +type GtidFormat int + +const ( + GtidFormatClassic = iota + GtidFormatTagged +) + +// Decode the number of sids (source identifiers) and if it is using +// tagged GTIDs or classic (non-tagged) GTIDs. +// +// Note that each gtid tag increases the sidno here, so a single UUID +// might turn up multiple times if there are multipl tags. +// +// see also: +// decode_nsids_format in mysql/mysql-server +// https://github.com/mysql/mysql-server/blob/61a3a1d8ef15512396b4c2af46e922a19bf2b174/sql/rpl_gtid_set.cc#L1363-L1378 +func decodeSid(data []byte) (format GtidFormat, sidnr uint64) { + if data[7] == 1 { + format = GtidFormatTagged + } + + if format == GtidFormatTagged { + masked := make([]byte, 8) + copy(masked, data[1:7]) + sidnr = binary.LittleEndian.Uint64(masked) + return + } + sidnr = binary.LittleEndian.Uint64(data[:8]) + return +} + func (e *PreviousGTIDsEvent) Decode(data []byte) error { pos := 0 - uuidCount := binary.LittleEndian.Uint16(data[pos : pos+8]) + + format, uuidCount := decodeSid(data) pos += 8 previousGTIDSets := make([]string, uuidCount) - for i := range previousGTIDSets { + + currentSetnr := 0 + var buf strings.Builder + for range previousGTIDSets { uuid := e.decodeUuid(data[pos : pos+16]) pos += 16 + var tag string + if format == GtidFormatTagged { + tagLength := int(data[pos]) / 2 + pos += 1 + if tagLength > 0 { // 0 == no tag, >0 == tag + tag = string(data[pos : pos+tagLength]) + pos += tagLength + } + } + + if len(tag) > 0 { + buf.WriteString(":") + buf.WriteString(tag) + } else { + if currentSetnr != 0 { + buf.WriteString(",") + } + buf.WriteString(uuid) + currentSetnr += 1 + } + sliceCount := binary.LittleEndian.Uint16(data[pos : pos+8]) pos += 8 - intervals := make([]string, sliceCount) - for i := range intervals { + for range sliceCount { + buf.WriteString(":") + start := e.decodeInterval(data[pos : pos+8]) pos += 8 stop := e.decodeInterval(data[pos : pos+8]) pos += 8 - interval := "" if stop == start+1 { - interval = fmt.Sprintf("%d", start) + fmt.Fprintf(&buf, "%d", start) } else { - interval = fmt.Sprintf("%d-%d", start, stop-1) + fmt.Fprintf(&buf, "%d-%d", start, stop-1) } - intervals[i] = interval } - previousGTIDSets[i] = fmt.Sprintf("%s:%s", uuid, strings.Join(intervals, ":")) + if len(tag) == 0 { + currentSetnr += 1 + } } - e.GTIDSets = strings.Join(previousGTIDSets, ",") + e.GTIDSets = buf.String() return nil } diff --git a/replication/event_test.go b/replication/event_test.go index 1333cd5ef..d7fa43927 100644 --- a/replication/event_test.go +++ b/replication/event_test.go @@ -140,3 +140,57 @@ func TestIntVarEvent(t *testing.T) { require.Equal(t, INSERT_ID, ev.Type) require.Equal(t, uint64(23), ev.Value) } + +func TestDecodeSid(t *testing.T) { + testcases := []struct { + input []byte + gtidFormat GtidFormat + uuidCount uint64 + }{ + {[]byte{1, 2, 0, 0, 0, 0, 0, 1}, GtidFormatTagged, 2}, + {[]byte{1, 1, 0, 0, 0, 0, 0, 1}, GtidFormatTagged, 1}, + {[]byte{1, 0, 0, 0, 0, 0, 0, 1}, GtidFormatTagged, 0}, + {[]byte{1, 0, 0, 0, 0, 0, 0, 0}, GtidFormatClassic, 1}, + } + + for _, tc := range testcases { + format, uuidCount := decodeSid(tc.input) + assert.Equal(t, tc.gtidFormat, format) + assert.Equal(t, tc.uuidCount, uuidCount) + } +} + +func TestPreviousGTIDEvent(t *testing.T) { + testcases := []struct { + input []byte + GTIDSets string + }{ + { + []byte{0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1}, + "", + }, + { + []byte{0x1, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x89, 0x6e, 0x78, 0x82, 0x18, 0xfe, 0x11, 0xef, 0xab, 0x88, 0x22, 0x22, 0x2d, 0x34, 0xd4, 0x11, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, + "896e7882-18fe-11ef-ab88-22222d34d411:1-3", + }, + { + []byte{0x1, 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x89, 0x6e, 0x78, 0x82, 0x18, 0xfe, 0x11, 0xef, 0xab, 0x88, 0x22, 0x22, 0x2d, 0x34, 0xd4, 0x11, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x5, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x89, 0x6e, 0x78, 0x82, 0x18, 0xfe, 0x11, 0xef, 0xab, 0x88, 0x22, 0x22, 0x2d, 0x34, 0xd4, 0x11, 0x8, 0x61, 0x61, 0x61, 0x61, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, + "896e7882-18fe-11ef-ab88-22222d34d411:1-4:aaaa:1", + }, + { + []byte{0x1, 0x7, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x89, 0x6e, 0x78, 0x82, 0x18, 0xfe, 0x11, 0xef, 0xab, 0x88, 0x22, 0x22, 0x2d, 0x34, 0xd4, 0x11, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x5, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x89, 0x6e, 0x78, 0x82, 0x18, 0xfe, 0x11, 0xef, 0xab, 0x88, 0x22, 0x22, 0x2d, 0x34, 0xd4, 0x11, 0x8, 0x61, 0x61, 0x61, 0x61, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x89, 0x6e, 0x78, 0x82, 0x18, 0xfe, 0x11, 0xef, 0xab, 0x88, 0x22, 0x22, 0x2d, 0x34, 0xd4, 0x11, 0x6, 0x61, 0x62, 0x63, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x89, 0x6e, 0x78, 0x82, 0x18, 0xfe, 0x11, 0xef, 0xab, 0x88, 0x22, 0x22, 0x2d, 0x34, 0xd4, 0x11, 0xa, 0x62, 0x62, 0x62, 0x62, 0x62, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x89, 0x6e, 0x78, 0x82, 0x18, 0xfe, 0x11, 0xef, 0xab, 0x88, 0x22, 0x22, 0x2d, 0x34, 0xd4, 0x11, 0xc, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x89, 0x6e, 0x78, 0x82, 0x18, 0xfe, 0x11, 0xef, 0xab, 0x88, 0x22, 0x22, 0x2d, 0x34, 0xd4, 0x11, 0x2, 0x78, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x89, 0x6e, 0x78, 0x82, 0x18, 0xfe, 0x11, 0xef, 0xab, 0x88, 0x22, 0x22, 0x2d, 0x34, 0xd4, 0x12, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x3, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, + "896e7882-18fe-11ef-ab88-22222d34d411:1-4:aaaa:1:abc:1-3:bbbbb:1:bbbbbb:1:x:1,896e7882-18fe-11ef-ab88-22222d34d412:1-2", + }, + { + []byte{0x01, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x04, 0x2f, 0x20, 0xcc, 0xbc, 0x4c, 0x11, 0xef, 0xa1, 0xd0, 0x02, 0x42, 0xac, 0x11, 0x00, 0x02, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x2f, 0x20, 0xcc, 0xbc, 0x4c, 0x11, 0xef, 0xa1, 0xd0, 0x02, 0x42, 0xac, 0x11, 0x00, 0x02, 0x06, 0x61, 0x61, 0x61, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x2f, 0x20, 0xcc, 0xbc, 0x4c, 0x11, 0xef, 0xa1, 0xd0, 0x02, 0x42, 0xac, 0x11, 0x00, 0x02, 0x28, 0x74, 0x61, 0x67, 0x34, 0x35, 0x36, 0x37, 0x38, 0x39, 0x30, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, 0x38, 0x39, 0x30, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x2f, 0x20, 0xcc, 0xbc, 0x4c, 0x11, 0xef, 0xa1, 0xd0, 0x02, 0x42, 0xac, 0x11, 0x00, 0x02, 0x40, 0x74, 0x61, 0x67, 0x34, 0x35, 0x36, 0x37, 0x38, 0x39, 0x30, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, 0x38, 0x39, 0x30, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, 0x38, 0x39, 0x30, 0x31, 0x32, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, + "042f20cc-bc4c-11ef-a1d0-0242ac110002:1-7:aaa:1:tag45678901234567890:1:tag45678901234567890123456789012:1", + }, + } + + for _, tc := range testcases { + e := PreviousGTIDsEvent{} + err := e.Decode(tc.input) + require.NoError(t, err) + require.Equal(t, tc.GTIDSets, e.GTIDSets) + } +}