Skip to content

Commit

Permalink
Expose SyncMessage
Browse files Browse the repository at this point in the history
I need to be able to access the changes in the message.
  • Loading branch information
ConradIrwin committed Jun 11, 2023
1 parent 425bfd8 commit e6d5194
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 27 deletions.
15 changes: 7 additions & 8 deletions automerge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ func TestSyncState(t *testing.T) {

resync := func() {
var valid1, valid2 bool
var m []byte
var m *automerge.SyncMessage
var err error

for {
Expand All @@ -689,15 +689,16 @@ func TestSyncState(t *testing.T) {
if !valid1 {
break
}
require.NoError(t, sState.ReceiveMessage(m))
_, err = sState.ReceiveMessage(m.Bytes())
require.NoError(t, err)

m, valid2 = sState.GenerateMessage()
require.NoError(t, err)
if !valid2 {
break
}

require.NoError(t, cState.ReceiveMessage(m))
_, err = cState.ReceiveMessage(m.Bytes())
require.NoError(t, err)
}
}

Expand All @@ -721,10 +722,8 @@ func TestSyncState(t *testing.T) {
require.Equal(t, cV, sV)
require.Equal(t, map[string]int{"s": 15, "c": 15}, cV)

cBytes, err := cState.Save()
require.NoError(t, err)
sBytes, err := sState.Save()
require.NoError(t, err)
cBytes := cState.Save()
sBytes := sState.Save()
cState, err = automerge.LoadSyncState(cDoc, cBytes)
require.NoError(t, err)
sState, err = automerge.LoadSyncState(sDoc, sBytes)
Expand Down
4 changes: 2 additions & 2 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ loop:
for {
msg, valid := syncState.GenerateMessage()
if valid {
send <- msg
send <- msg.Bytes()
}

select {
Expand All @@ -61,7 +61,7 @@ loop:
break loop
}

err := syncState.ReceiveMessage(msg)
_, err := syncState.ReceiveMessage(msg)
if err != nil {
panic(err)
}
Expand Down
4 changes: 2 additions & 2 deletions item.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,10 +251,10 @@ func (i *item) syncState() *SyncState {
return ss
}

func (i *item) syncMessage() *syncMessage {
func (i *item) syncMessage() *SyncMessage {
defer runtime.KeepAlive(i)

ss := &syncMessage{item: i}
ss := &SyncMessage{item: i}
if !C.AMitemToSyncMessage(i.cItem, &ss.cSyncMessage) {
if i.Kind() == KindVoid {
return nil
Expand Down
46 changes: 31 additions & 15 deletions sync_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,56 +40,53 @@ func LoadSyncState(d *Doc, raw []byte) (*SyncState, error) {

// ReceiveMessage should be called with every message created by GenerateMessage
// on the peer side.
func (ss *SyncState) ReceiveMessage(msg []byte) error {
func (ss *SyncState) ReceiveMessage(msg []byte) (*SyncMessage, error) {
sm, err := loadSyncMessage(msg)
if err != nil {
return err
return nil, err
}

defer runtime.KeepAlive(ss)
defer runtime.KeepAlive(sm)
cDoc, unlock := ss.Doc.lock()
defer unlock()

return wrap(C.AMreceiveSyncMessage(cDoc, ss.cSyncState, sm.cSyncMessage)).void()
return sm, wrap(C.AMreceiveSyncMessage(cDoc, ss.cSyncState, sm.cSyncMessage)).void()
}

// GenerateMessage generates the next message to send to the client.
// If `valid` is false the clients are currently in sync and there are
// no more messages to send (until you either modify the underlying document)
func (ss *SyncState) GenerateMessage() (bytes []byte, valid bool) {
func (ss *SyncState) GenerateMessage() (sm *SyncMessage, valid bool) {
defer runtime.KeepAlive(ss)
cDoc, unlock := ss.Doc.lock()
defer unlock()

sm := must(wrap(C.AMgenerateSyncMessage(cDoc, ss.cSyncState)).item()).syncMessage()
sm = must(wrap(C.AMgenerateSyncMessage(cDoc, ss.cSyncState)).item()).syncMessage()

if sm == nil {
return nil, false
}

return sm.save(), true
return sm, true
}

// Save serializes the sync state so that you can resume it later.
// This is an optimization to reduce the number of round-trips required
// to get two peers in sync at a later date.
func (ss *SyncState) Save() ([]byte, error) {
func (ss *SyncState) Save() []byte {
defer runtime.KeepAlive(ss)

item, err := wrap(C.AMsyncStateEncode(ss.cSyncState)).item()
if err != nil {
return nil, err
}
return item.bytes(), nil
return must(wrap(C.AMsyncStateEncode(ss.cSyncState)).item()).bytes()
}

type syncMessage struct {
// SyncMessage is sent between peers to keep copies of a document in sync.
type SyncMessage struct {
item *item
cSyncMessage *C.AMsyncMessage
}

func loadSyncMessage(msg []byte) (*syncMessage, error) {
func loadSyncMessage(msg []byte) (*SyncMessage, error) {
cBytes, free := toByteSpan(msg)
defer free()

Expand All @@ -100,7 +97,26 @@ func loadSyncMessage(msg []byte) (*syncMessage, error) {
return item.syncMessage(), nil
}

func (sm *syncMessage) save() []byte {
// Changes returns any changes included in this SyncMessage
func (sm *SyncMessage) Changes() []*Change {
defer runtime.KeepAlive(sm)

items := must(wrap(C.AMsyncMessageChanges(sm.cSyncMessage)).items())

return mapItems(items, func(i *item) *Change { return i.change() })
}

// Heads gives the heads of the peer that generated the SyncMessage
func (sm *SyncMessage) Heads() []ChangeHash {
defer runtime.KeepAlive(sm)

items := must(wrap(C.AMsyncMessageHeads(sm.cSyncMessage)).items())

return mapItems(items, func(i *item) ChangeHash { return i.changeHash() })
}

// Bytes returns a representation for sending over the network.
func (sm *SyncMessage) Bytes() []byte {
if sm == nil {
return nil
}
Expand Down

0 comments on commit e6d5194

Please sign in to comment.