Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit b7b8f38

Browse files
committedFeb 18, 2025·
updates
1 parent a7e4280 commit b7b8f38

File tree

1 file changed

+115
-8
lines changed

1 file changed

+115
-8
lines changed
 

‎replication/event.go

+115-8
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,7 @@ func (e *QueryEvent) Dump(w io.Writer) {
421421
type GTIDEvent struct {
422422
CommitFlag uint8
423423
SID []byte
424+
Tag string
424425
GNO int64
425426
LastCommitted int64
426427
SequenceNumber int64
@@ -513,7 +514,11 @@ func (e *GTIDEvent) Dump(w io.Writer) {
513514

514515
fmt.Fprintf(w, "Commit flag: %d\n", e.CommitFlag)
515516
u, _ := uuid.FromBytes(e.SID)
516-
fmt.Fprintf(w, "GTID_NEXT: %s:%d\n", u.String(), e.GNO)
517+
if e.Tag != "" {
518+
fmt.Fprintf(w, "GTID_NEXT: %s:%s:%d\n", u.String(), e.Tag, e.GNO)
519+
} else {
520+
fmt.Fprintf(w, "GTID_NEXT: %s:%s%d\n", u.String(), e.GNO)
521+
}
517522
fmt.Fprintf(w, "LAST_COMMITTED: %d\n", e.LastCommitted)
518523
fmt.Fprintf(w, "SEQUENCE_NUMBER: %d\n", e.SequenceNumber)
519524
fmt.Fprintf(w, "Immediate commmit timestamp: %d (%s)\n", e.ImmediateCommitTimestamp, fmtTime(e.ImmediateCommitTime()))
@@ -544,7 +549,10 @@ func (e *GTIDEvent) OriginalCommitTime() time.Time {
544549
return microSecTimestampToTime(e.OriginalCommitTimestamp)
545550
}
546551

552+
// GtidTaggedLogEvent is for a GTID event with a tag.
553+
// This is similar to GTIDEvent, but it has a tag and uses a different serialization format.
547554
type GtidTaggedLogEvent struct {
555+
GTIDEvent
548556
msg serialization.Message
549557
}
550558

@@ -615,20 +623,119 @@ func (e *GtidTaggedLogEvent) Decode(data []byte) error {
615623
return err
616624
}
617625

618-
return nil
619-
}
626+
f, err := e.msg.GetFieldByName("gtid_flags")
627+
if err != nil {
628+
return err
629+
}
630+
if v, ok := f.Type.(serialization.FieldIntFixed); ok {
631+
e.CommitFlag = v.Value[0]
632+
} else {
633+
return errors.New("failed to get gtid_flags field")
634+
}
620635

621-
func (e *GtidTaggedLogEvent) Dump(w io.Writer) {
622-
fmt.Println(e.msg.String())
636+
f, err = e.msg.GetFieldByName("uuid")
637+
if err != nil {
638+
return err
639+
}
640+
if v, ok := f.Type.(serialization.FieldIntFixed); ok {
641+
e.SID = v.Value
642+
} else {
643+
return errors.New("failed to get uuid field")
644+
}
623645

624-
f, err := e.msg.GetFieldByName("immediate_server_version")
646+
f, err = e.msg.GetFieldByName("gno")
625647
if err != nil {
626-
return
648+
return err
649+
}
650+
if v, ok := f.Type.(serialization.FieldIntVar); ok {
651+
e.GNO = v.Value
652+
} else {
653+
return errors.New("failed to get gno field")
627654
}
628655

656+
f, err = e.msg.GetFieldByName("tag")
657+
if err != nil {
658+
return err
659+
}
660+
if v, ok := f.Type.(serialization.FieldString); ok {
661+
e.Tag = v.Value
662+
} else {
663+
return errors.New("failed to get tag field")
664+
}
665+
666+
f, err = e.msg.GetFieldByName("last_committed")
667+
if err != nil {
668+
return err
669+
}
629670
if v, ok := f.Type.(serialization.FieldIntVar); ok {
630-
fmt.Printf("Immediate server version: %d\n", v.Value)
671+
e.LastCommitted = v.Value
672+
} else {
673+
return errors.New("failed to get last_comitted field")
674+
}
675+
676+
f, err = e.msg.GetFieldByName("sequence_number")
677+
if err != nil {
678+
return err
631679
}
680+
if v, ok := f.Type.(serialization.FieldIntVar); ok {
681+
e.SequenceNumber = v.Value
682+
} else {
683+
return errors.New("failed to get sequence_number field")
684+
}
685+
686+
f, err = e.msg.GetFieldByName("immediate_commit_timestamp")
687+
if err != nil {
688+
return err
689+
}
690+
if v, ok := f.Type.(serialization.FieldUintVar); ok {
691+
e.ImmediateCommitTimestamp = v.Value
692+
} else {
693+
return errors.New("failed to get immediate_commit_timestamp field")
694+
}
695+
696+
f, err = e.msg.GetFieldByName("original_commit_timestamp")
697+
if err != nil {
698+
return err
699+
}
700+
if v, ok := f.Type.(serialization.FieldUintVar); ok {
701+
e.OriginalCommitTimestamp = v.Value
702+
} else {
703+
return errors.New("failed to get original_commit_timestamp field")
704+
}
705+
706+
f, err = e.msg.GetFieldByName("immediate_server_version")
707+
if err != nil {
708+
return err
709+
}
710+
if v, ok := f.Type.(serialization.FieldUintVar); ok {
711+
e.ImmediateServerVersion = uint32(v.Value)
712+
} else {
713+
return errors.New("failed to get immediate_server_version field")
714+
}
715+
716+
f, err = e.msg.GetFieldByName("original_server_version")
717+
if err != nil {
718+
return err
719+
}
720+
if v, ok := f.Type.(serialization.FieldUintVar); ok {
721+
e.OriginalServerVersion = uint32(v.Value)
722+
} else {
723+
return errors.New("failed to get original_server_version field")
724+
}
725+
726+
f, err = e.msg.GetFieldByName("transaction_length")
727+
if err != nil {
728+
return err
729+
}
730+
if v, ok := f.Type.(serialization.FieldUintVar); ok {
731+
e.TransactionLength = v.Value
732+
} else {
733+
return errors.New("failed to get transaction_length field")
734+
}
735+
736+
// TODO: add and test commit_group_ticket
737+
738+
return nil
632739
}
633740

634741
type BeginLoadQueryEvent struct {

0 commit comments

Comments
 (0)
Please sign in to comment.