Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Give RowsEvent a method to inspect the underlying event type. #1016

Merged
38 changes: 38 additions & 0 deletions replication/row_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,7 @@ type RowsEvent struct {
// for mariadb *_COMPRESSED_EVENT_V1
compressed bool

// raw event type associated with a RowsEvent
eventType EventType

Table *TableMapEvent
Expand Down Expand Up @@ -950,6 +951,29 @@ type RowsEvent struct {
ignoreJSONDecodeErr bool
}

// EnumRowsEventType is an abridged type describing the operation which triggered the given RowsEvent.
type EnumRowsEventType byte

const (
EnumRowsEventTypeUnknown = EnumRowsEventType(iota)
EnumRowsEventTypeInsert
EnumRowsEventTypeUpdate
EnumRowsEventTypeDelete
)

func (t EnumRowsEventType) String() string {
switch t {
case EnumRowsEventTypeInsert:
return "insert"
case EnumRowsEventTypeUpdate:
return "update"
case EnumRowsEventTypeDelete:
return "delete"
default:
return fmt.Sprintf("unknown (%d)", t)
}
}

// EnumRowImageType is allowed types for every row in mysql binlog.
// See https://github.com/mysql/mysql-server/blob/1bfe02bdad6604d54913c62614bde57a055c8332/sql/rpl_record.h#L39
// enum class enum_row_image_type { WRITE_AI, UPDATE_BI, UPDATE_AI, DELETE_BI };
Expand Down Expand Up @@ -1120,6 +1144,19 @@ func (e *RowsEvent) Decode(data []byte) error {
return e.DecodeData(pos, data)
}

func (e *RowsEvent) Type() EnumRowsEventType {
switch e.eventType {
case WRITE_ROWS_EVENTv0, WRITE_ROWS_EVENTv1, WRITE_ROWS_EVENTv2, MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1:
return EnumRowsEventTypeInsert
case UPDATE_ROWS_EVENTv0, UPDATE_ROWS_EVENTv1, UPDATE_ROWS_EVENTv2, MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1:
return EnumRowsEventTypeUpdate
case DELETE_ROWS_EVENTv0, DELETE_ROWS_EVENTv1, DELETE_ROWS_EVENTv2, MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1:
return EnumRowsEventTypeDelete
default:
return EnumRowsEventTypeUnknown
}
}

func isBitSet(bitmap []byte, i int) bool {
return bitmap[i>>3]&(1<<(uint(i)&7)) > 0
}
Expand Down Expand Up @@ -1817,6 +1854,7 @@ func (e *RowsEvent) Dump(w io.Writer) {
fmt.Fprintf(w, "Flags: %d\n", e.Flags)
fmt.Fprintf(w, "Column count: %d\n", e.ColumnCount)
fmt.Fprintf(w, "NDB data: %s\n", e.NdbData)
fmt.Fprintf(w, "Event type: %s (%s)", e.Type(), e.eventType)

fmt.Fprintf(w, "Values:\n")
for _, rows := range e.Rows {
Expand Down
31 changes: 31 additions & 0 deletions replication/row_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1176,6 +1176,37 @@ func TestRowsDataExtraData(t *testing.T) {
}
}

func TestRowsEventType(t *testing.T) {
testcases := []struct {
eventType EventType
want EnumRowsEventType
}{
{WRITE_ROWS_EVENTv0, EnumRowsEventTypeInsert},
{WRITE_ROWS_EVENTv1, EnumRowsEventTypeInsert},
{WRITE_ROWS_EVENTv2, EnumRowsEventTypeInsert},
{MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1, EnumRowsEventTypeInsert},
{UPDATE_ROWS_EVENTv0, EnumRowsEventTypeUpdate},
{UPDATE_ROWS_EVENTv1, EnumRowsEventTypeUpdate},
{UPDATE_ROWS_EVENTv2, EnumRowsEventTypeUpdate},
{MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1, EnumRowsEventTypeUpdate},
{DELETE_ROWS_EVENTv0, EnumRowsEventTypeDelete},
{DELETE_ROWS_EVENTv1, EnumRowsEventTypeDelete},
{DELETE_ROWS_EVENTv2, EnumRowsEventTypeDelete},
{MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1, EnumRowsEventTypeDelete},

// Whoops, these are not rows events at all
{EXEC_LOAD_EVENT, EnumRowsEventTypeUnknown},
{HEARTBEAT_EVENT, EnumRowsEventTypeUnknown},
}

for _, tc := range testcases {
rev := new(RowsEvent)
rev.eventType = tc.eventType

require.Equal(t, tc.want, rev.Type())
}
}

func TestTableMapHelperMaps(t *testing.T) {
/*
CREATE TABLE `_types` (
Expand Down
15 changes: 15 additions & 0 deletions replication/transaction_payload_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ func TestTransactionPayloadEventDecode(t *testing.T) {
}
err := e.decodePayload()
require.NoError(t, err)

// Check raw events
require.Len(t, e.Events, 8)
require.Equal(t, QUERY_EVENT, e.Events[0].Header.EventType)
require.Equal(t, TABLE_MAP_EVENT, e.Events[1].Header.EventType)
Expand All @@ -78,4 +80,17 @@ func TestTransactionPayloadEventDecode(t *testing.T) {
require.Equal(t, TABLE_MAP_EVENT, e.Events[5].Header.EventType)
require.Equal(t, DELETE_ROWS_EVENTv2, e.Events[6].Header.EventType)
require.Equal(t, XID_EVENT, e.Events[7].Header.EventType)

// Check insert/update/delete rows events casting
ievent, ok := e.Events[2].Event.(*RowsEvent)
require.True(t, ok)
require.Equal(t, ievent.Type(), EnumRowsEventTypeInsert)

uevent, ok := e.Events[4].Event.(*RowsEvent)
require.True(t, ok)
require.Equal(t, uevent.Type(), EnumRowsEventTypeUpdate)

devent, ok := e.Events[6].Event.(*RowsEvent)
require.True(t, ok)
require.Equal(t, devent.Type(), EnumRowsEventTypeDelete)
}
Loading