Skip to content

Commit

Permalink
parseComBinlogDumpGTID: GTID payload is always 5.6 flavor (#17605)
Browse files Browse the repository at this point in the history
Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach authored Jan 23, 2025
1 parent 55b6865 commit 1d57f39
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 11 deletions.
8 changes: 5 additions & 3 deletions go/mysql/binlog_dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,13 @@ func (c *Conn) parseComBinlogDumpGTID(data []byte) (logFile string, logPos uint6
if !ok {
return logFile, logPos, position, readPacketErr
}
if gtid := string(data[pos : pos+int(dataSize)]); gtid != "" {
position, err = replication.DecodePosition(gtid)
if gtidBytes := data[pos : pos+int(dataSize)]; len(gtidBytes) != 0 {
gtid, err := replication.NewMysql56GTIDSetFromSIDBlock(gtidBytes)
if err != nil {
return logFile, logPos, position, err
return logFile, logPos, position, vterrors.Wrapf(err, "error parsing GTID from BinlogDumpGTID packet")
}
// ComBinlogDumpGTID is a MySQL specific protocol. The GTID flavor is necessarily MySQL 56
position = replication.Position{GTIDSet: gtid}
}
if flags2&BinlogDumpNonBlock != 0 {
return logFile, logPos, position, io.EOF
Expand Down
5 changes: 4 additions & 1 deletion go/mysql/flavor_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,10 @@ func (mysqlFlavor) sendBinlogDumpCommand(c *Conn, serverID uint32, binlogFilenam
}

// Build the command.
sidBlock := gtidSet.SIDBlock()
var sidBlock []byte
if gtidSet != nil {
sidBlock = gtidSet.SIDBlock()
}
var flags2 uint16
if binlogFilename != "" {
flags2 |= BinlogThroughPosition
Expand Down
9 changes: 5 additions & 4 deletions go/mysql/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ func (c *Conn) AnalyzeSemiSyncAckRequest(buf []byte) (strippedBuf []byte, ackReq
// WriteComBinlogDumpGTID writes a ComBinlogDumpGTID command.
// Only works with MySQL 5.6+ (and not MariaDB).
// See http://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html for syntax.
func (c *Conn) WriteComBinlogDumpGTID(serverID uint32, binlogFilename string, binlogPos uint64, flags uint16, gtidSet []byte) error {
// sidBlock must be the result of a gtidSet.SIDBlock() function.
func (c *Conn) WriteComBinlogDumpGTID(serverID uint32, binlogFilename string, binlogPos uint64, flags uint16, sidBlock []byte) error {
c.sequence = 0
length := 1 + // ComBinlogDumpGTID
2 + // flags
Expand All @@ -90,16 +91,16 @@ func (c *Conn) WriteComBinlogDumpGTID(serverID uint32, binlogFilename string, bi
len(binlogFilename) + // binlog-filename
8 + // binlog-pos
4 + // data-size
len(gtidSet) // data
len(sidBlock) // data
data, pos := c.startEphemeralPacketWithHeader(length)
pos = writeByte(data, pos, ComBinlogDumpGTID) // nolint
pos = writeUint16(data, pos, flags) // nolint
pos = writeUint32(data, pos, serverID) // nolint
pos = writeUint32(data, pos, uint32(len(binlogFilename))) // nolint
pos = writeEOFString(data, pos, binlogFilename) // nolint
pos = writeUint64(data, pos, binlogPos) // nolint
pos = writeUint32(data, pos, uint32(len(gtidSet))) // nolint
pos += copy(data[pos:], gtidSet) // nolint
pos = writeUint32(data, pos, uint32(len(sidBlock))) // nolint
pos += copy(data[pos:], sidBlock) // nolint
if err := c.writeEphemeralPacket(); err != nil {
return sqlerror.NewSQLErrorf(sqlerror.CRServerGone, sqlerror.SSUnknownSQLState, "%v", err)
}
Expand Down
48 changes: 45 additions & 3 deletions go/mysql/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/test/utils"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
Expand Down Expand Up @@ -88,14 +89,50 @@ func TestComBinlogDumpGTID(t *testing.T) {
cConn.Close()
}()

t.Run("WriteComBinlogDumpGTIDEmptyGTID", func(t *testing.T) {
// Write ComBinlogDumpGTID packet, read it, compare.
var flags uint16 = 0x0d0e
err := cConn.WriteComBinlogDumpGTID(0x01020304, "moofarm", 0x05060708090a0b0c, flags, []byte{})
assert.NoError(t, err)
data, err := sConn.ReadPacket()
require.NoError(t, err, "sConn.ReadPacket - ComBinlogDumpGTID failed: %v", err)
require.NotEmpty(t, data)
require.EqualValues(t, data[0], ComBinlogDumpGTID)

expectedData := []byte{
ComBinlogDumpGTID,
0x0e, 0x0d, // flags
0x04, 0x03, 0x02, 0x01, // server-id
0x07, 0x00, 0x00, 0x00, // binlog-filename-len
'm', 'o', 'o', 'f', 'a', 'r', 'm', // bilog-filename
0x0c, 0x0b, 0x0a, 0x09, 0x08, 0x07, 0x06, 0x05, // binlog-pos
0x00, 0x00, 0x00, 0x00, // data-size is zero, no GTID payload
}
assert.Equal(t, expectedData, data)
logFile, logPos, pos, err := sConn.parseComBinlogDumpGTID(data)
require.NoError(t, err, "parseComBinlogDumpGTID failed: %v", err)
assert.Equal(t, "moofarm", logFile)
assert.Equal(t, uint64(0x05060708090a0b0c), logPos)
assert.True(t, pos.IsZero())
})

sConn.sequence = 0

t.Run("WriteComBinlogDumpGTID", func(t *testing.T) {
// Write ComBinlogDumpGTID packet, read it, compare.
var flags uint16 = 0x0d0e
assert.Equal(t, flags, flags|BinlogThroughGTID)
err := cConn.WriteComBinlogDumpGTID(0x01020304, "moofarm", 0x05060708090a0b0c, flags, []byte{0xfa, 0xfb})
gtidSet, err := replication.ParseMysql56GTIDSet("16b1039f-22b6-11ed-b765-0a43f95f28a3:1-243")
require.NoError(t, err)
sidBlock := gtidSet.SIDBlock()
assert.Len(t, sidBlock, 48)

err = cConn.WriteComBinlogDumpGTID(0x01020304, "moofarm", 0x05060708090a0b0c, flags, sidBlock)
assert.NoError(t, err)
data, err := sConn.ReadPacket()
require.NoError(t, err, "sConn.ReadPacket - ComBinlogDumpGTID failed: %v", err)
require.NotEmpty(t, data)
require.EqualValues(t, data[0], ComBinlogDumpGTID)

expectedData := []byte{
ComBinlogDumpGTID,
Expand All @@ -104,10 +141,15 @@ func TestComBinlogDumpGTID(t *testing.T) {
0x07, 0x00, 0x00, 0x00, // binlog-filename-len
'm', 'o', 'o', 'f', 'a', 'r', 'm', // bilog-filename
0x0c, 0x0b, 0x0a, 0x09, 0x08, 0x07, 0x06, 0x05, // binlog-pos
0x02, 0x00, 0x00, 0x00, // data-size
0xfa, 0xfb, // data
0x30, 0x00, 0x00, 0x00, // data-size
}
expectedData = append(expectedData, sidBlock...) // data
assert.Equal(t, expectedData, data)
logFile, logPos, pos, err := sConn.parseComBinlogDumpGTID(data)
require.NoError(t, err, "parseComBinlogDumpGTID failed: %v", err)
assert.Equal(t, "moofarm", logFile)
assert.Equal(t, uint64(0x05060708090a0b0c), logPos)
assert.Equal(t, gtidSet, pos.GTIDSet)
})

sConn.sequence = 0
Expand Down

0 comments on commit 1d57f39

Please sign in to comment.