Skip to content

Commit 47899fd

Browse files
authored
feat_: hash based query for outgoing messages. (#5217)
* feat_: hash based query for outgoing messages. * chore_: more logs * chore_: fix comments * chore_: do not lock when send queries * chore_: use constant for magic number * chore_: remove message ids from query queue after ack * chore_: fix ack clean process * chore_: fix message resend test * chore_: add test for waku confirm message sent. * chore_: fix tests. * chore_: fix more * chore_: set store peer id when mailserver updates * fix_: tests * chore_: increase max hash query length * chore_: remove debug log of ack message * chore_: remove automatic peer selection * chore_: mark raw message to sent after ack * chore_: fix test * chore_: fix test
1 parent 27934a4 commit 47899fd

16 files changed

+355
-39
lines changed

api/messenger_raw_message_resend_test.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ func (s *MessengerRawMessageResendTest) TestMessageSent() {
211211
rawMessage, err := s.bobMessenger.RawMessageByID(ids[0])
212212
s.Require().NoError(err)
213213
s.Require().NotNil(rawMessage)
214-
if rawMessage.Sent {
214+
if rawMessage.SendCount > 0 {
215215
return nil
216216
}
217217
return errors.New("raw message should be sent finally")
@@ -227,12 +227,13 @@ func (s *MessengerRawMessageResendTest) TestMessageResend() {
227227
rawMessage, err := s.bobMessenger.RawMessageByID(ids[0])
228228
s.Require().NoError(err)
229229
s.Require().NotNil(rawMessage)
230-
s.Require().NoError(s.bobMessenger.UpdateRawMessageSent(rawMessage.ID, false, 0))
230+
s.Require().NoError(s.bobMessenger.UpdateRawMessageSent(rawMessage.ID, false))
231+
s.Require().NoError(s.bobMessenger.UpdateRawMessageLastSent(rawMessage.ID, 0))
231232
err = tt.RetryWithBackOff(func() error {
232233
rawMessage, err := s.bobMessenger.RawMessageByID(ids[0])
233234
s.Require().NoError(err)
234235
s.Require().NotNil(rawMessage)
235-
if !rawMessage.Sent {
236+
if rawMessage.SendCount < 2 {
236237
return errors.New("message ApplicationMetadataMessage_COMMUNITY_REQUEST_TO_JOIN was not resent yet")
237238
}
238239
return nil

eth-node/bridge/geth/waku.go

+6
Original file line numberDiff line numberDiff line change
@@ -314,3 +314,9 @@ func GetWakuFilterFrom(f types.Filter) *wakucommon.Filter {
314314
func (w *wakuFilterWrapper) ID() string {
315315
return w.id
316316
}
317+
318+
func (w *GethWakuWrapper) ConfirmMessageDelivered(hashes []common.Hash) {
319+
}
320+
321+
func (w *GethWakuWrapper) SetStorePeerID(peerID peer.ID) {
322+
}

eth-node/bridge/geth/wakuv2.go

+8
Original file line numberDiff line numberDiff line change
@@ -333,3 +333,11 @@ func GetWakuV2FilterFrom(f types.Filter) *wakucommon.Filter {
333333
func (w *wakuV2FilterWrapper) ID() string {
334334
return w.id
335335
}
336+
337+
func (w *gethWakuV2Wrapper) ConfirmMessageDelivered(hashes []common.Hash) {
338+
w.waku.ConfirmMessageDelivered(hashes)
339+
}
340+
341+
func (w *gethWakuV2Wrapper) SetStorePeerID(peerID peer.ID) {
342+
w.waku.SetStorePeerID(peerID)
343+
}

eth-node/types/waku.go

+6
Original file line numberDiff line numberDiff line change
@@ -178,4 +178,10 @@ type Waku interface {
178178

179179
// ClearEnvelopesCache clears waku envelopes cache
180180
ClearEnvelopesCache()
181+
182+
// ConfirmMessageDelivered updates a message has been delivered in waku
183+
ConfirmMessageDelivered(hash []common.Hash)
184+
185+
// SetStorePeerID updates the peer id of store node
186+
SetStorePeerID(peerID peer.ID)
181187
}

protocol/common/raw_messages_persistence.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -488,7 +488,12 @@ func (db *RawMessagesPersistence) RemoveMessageSegmentsCompletedOlderThan(timest
488488
return err
489489
}
490490

491-
func (db RawMessagesPersistence) UpdateRawMessageSent(id string, sent bool, lastSent uint64) error {
492-
_, err := db.db.Exec("UPDATE raw_messages SET sent = ?, last_sent = ? WHERE id = ?", sent, lastSent, id)
491+
func (db RawMessagesPersistence) UpdateRawMessageSent(id string, sent bool) error {
492+
_, err := db.db.Exec("UPDATE raw_messages SET sent = ? WHERE id = ?", sent, id)
493+
return err
494+
}
495+
496+
func (db RawMessagesPersistence) UpdateRawMessageLastSent(id string, lastSent uint64) error {
497+
_, err := db.db.Exec("UPDATE raw_messages SET last_sent = ? WHERE id = ?", lastSent, id)
493498
return err
494499
}

protocol/common/raw_messages_persistence_test.go

+42-13
Original file line numberDiff line numberDiff line change
@@ -54,30 +54,59 @@ func TestUpdateRawMessageSent(t *testing.T) {
5454
require.NoError(t, err)
5555

5656
rawMessageID := "1"
57-
err = p.SaveRawMessage(&RawMessage{
58-
ID: rawMessageID,
59-
ResendType: ResendTypeRawMessage,
60-
LocalChatID: "",
61-
CommunityID: []byte("c1"),
62-
CommunityKeyExMsgType: KeyExMsgRekey,
63-
Sender: pk,
64-
ResendMethod: ResendMethodSendPrivate,
65-
Recipients: []*ecdsa.PublicKey{pk.Public().(*ecdsa.PublicKey)},
66-
Sent: true,
67-
LastSent: uint64(time.Now().UnixNano() / int64(time.Millisecond)),
68-
})
57+
err = p.SaveRawMessage(buildRawMessage(rawMessageID, pk))
6958
require.NoError(t, err)
7059

7160
rawMessage, err := p.RawMessageByID(rawMessageID)
7261
require.NoError(t, err)
7362
require.True(t, rawMessage.Sent)
7463
require.Greater(t, rawMessage.LastSent, uint64(0))
7564

76-
err = p.UpdateRawMessageSent(rawMessageID, false, 0)
65+
err = p.UpdateRawMessageSent(rawMessageID, false)
7766
require.NoError(t, err)
7867

7968
m, err := p.RawMessageByID(rawMessageID)
8069
require.NoError(t, err)
8170
require.False(t, m.Sent)
71+
}
72+
73+
func TestUpdateRawMessageLastSent(t *testing.T) {
74+
db, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{})
75+
require.NoError(t, err)
76+
require.NoError(t, sqlite.Migrate(db))
77+
p := NewRawMessagesPersistence(db)
78+
79+
pk, err := crypto.GenerateKey()
80+
require.NoError(t, err)
81+
82+
rawMessageID := "1"
83+
err = p.SaveRawMessage(buildRawMessage(rawMessageID, pk))
84+
require.NoError(t, err)
85+
86+
rawMessage, err := p.RawMessageByID(rawMessageID)
87+
require.NoError(t, err)
88+
require.True(t, rawMessage.Sent)
89+
require.Greater(t, rawMessage.LastSent, uint64(0))
90+
91+
err = p.UpdateRawMessageLastSent(rawMessageID, 0)
92+
require.NoError(t, err)
93+
94+
m, err := p.RawMessageByID(rawMessageID)
95+
require.NoError(t, err)
8296
require.Equal(t, m.LastSent, uint64(0))
8397
}
98+
99+
func buildRawMessage(rawMessageID string, pk *ecdsa.PrivateKey) *RawMessage {
100+
return &RawMessage{
101+
ID: rawMessageID,
102+
ResendType: ResendTypeRawMessage,
103+
LocalChatID: "",
104+
CommunityID: []byte("c1"),
105+
CommunityKeyExMsgType: KeyExMsgRekey,
106+
Sender: pk,
107+
ResendMethod: ResendMethodSendPrivate,
108+
Recipients: []*ecdsa.PublicKey{pk.Public().(*ecdsa.PublicKey)},
109+
Sent: true,
110+
LastSent: uint64(time.Now().UnixNano() / int64(time.Millisecond)),
111+
}
112+
}

protocol/messenger_mailserver_cycle.go

+5
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,11 @@ func (m *Messenger) connectToMailserver(ms mailservers.Mailserver) error {
426426
m.logger.Info("mailserver available", zap.String("address", m.mailserverCycle.activeMailserver.UniqueID()))
427427
m.EmitMailserverAvailable()
428428
signal.SendMailserverAvailable(m.mailserverCycle.activeMailserver.Address, m.mailserverCycle.activeMailserver.ID)
429+
peerID, err := m.mailserverCycle.activeMailserver.PeerID()
430+
if err != nil {
431+
m.logger.Error("could not decode the peer id of mailserver", zap.Error(err))
432+
}
433+
m.transport.SetStorePeerID(peerID)
429434

430435
// Query mailserver
431436
if m.config.codeControlFlags.AutoRequestHistoricMessages {

protocol/messenger_messages_tracking_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ func (s *MessengerMessagesTrackingSuite) testMessageMarkedAsSent(textSize int) {
172172
// Message should be marked as sent eventually
173173
err = tt.RetryWithBackOff(func() error {
174174
rawMessage, err = s.bob.persistence.RawMessageByID(inputMessage.ID)
175-
if err != nil || !rawMessage.Sent {
175+
if err != nil || rawMessage.SendCount < 1 {
176176
return errors.New("message not marked as sent")
177177
}
178178
return nil

protocol/messenger_peers.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ func (m *Messenger) AddStorePeer(address string) (peer.ID, error) {
1313
}
1414

1515
func (m *Messenger) AddRelayPeer(address string) (peer.ID, error) {
16-
return m.transport.AddStorePeer(address)
16+
return m.transport.AddRelayPeer(address)
1717
}
1818

1919
func (m *Messenger) DialPeer(address string) error {

protocol/messenger_peersyncing.go

+9
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,20 @@ func (m *Messenger) markDeliveredMessages(acks [][]byte) {
3838
messageID := messageIDBytes.String()
3939
//mark messages as delivered
4040

41+
m.logger.Debug("got datasync acknowledge for message", zap.String("ack", hex.EncodeToString(ack)), zap.String("messageID", messageID))
42+
4143
err = m.UpdateMessageOutgoingStatus(messageID, common.OutgoingStatusDelivered)
4244
if err != nil {
4345
m.logger.Debug("Can't set message status as delivered", zap.Error(err))
4446
}
4547

48+
err = m.UpdateRawMessageSent(messageID, true)
49+
if err != nil {
50+
m.logger.Debug("can't set raw message as sent", zap.Error(err))
51+
}
52+
53+
m.transport.ConfirmMessageDelivered(messageID)
54+
4655
//send signal to client that message status updated
4756
if m.config.messengerSignalsHandler != nil {
4857
message, err := m.persistence.MessageByID(messageID)

protocol/messenger_raw_message_resend.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,10 @@ func (m *Messenger) RawMessageByID(id string) (*common.RawMessage, error) {
200200
return m.persistence.RawMessageByID(id)
201201
}
202202

203-
func (m *Messenger) UpdateRawMessageSent(id string, sent bool, lastSent uint64) error {
204-
return m.persistence.UpdateRawMessageSent(id, sent, lastSent)
203+
func (m *Messenger) UpdateRawMessageSent(id string, sent bool) error {
204+
return m.persistence.UpdateRawMessageSent(id, sent)
205+
}
206+
207+
func (m *Messenger) UpdateRawMessageLastSent(id string, lastSent uint64) error {
208+
return m.persistence.UpdateRawMessageLastSent(id, lastSent)
205209
}

protocol/transport/envelopes_monitor.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ func (m *EnvelopesMonitor) Add(messageIDs [][]byte, envelopeHashes []types.Hash,
136136
defer m.mu.Unlock()
137137

138138
for _, messageID := range messageIDs {
139-
m.messageEnvelopeHashes[string(messageID)] = envelopeHashes
139+
m.messageEnvelopeHashes[types.HexBytes(messageID).String()] = envelopeHashes
140140
}
141141

142142
for i, envelopeHash := range envelopeHashes {
@@ -399,7 +399,7 @@ func (m *EnvelopesMonitor) processMessageIDs(messageIDs [][]byte) {
399399
sentMessageIDs := make([][]byte, 0, len(messageIDs))
400400

401401
for _, messageID := range messageIDs {
402-
hashes, ok := m.messageEnvelopeHashes[string(messageID)]
402+
hashes, ok := m.messageEnvelopeHashes[types.HexBytes(messageID).String()]
403403
if !ok {
404404
continue
405405
}
@@ -432,6 +432,6 @@ func (m *EnvelopesMonitor) clearMessageState(envelopeID types.Hash) {
432432
}
433433
delete(m.envelopes, envelopeID)
434434
for _, messageID := range envelope.messageIDs {
435-
delete(m.messageEnvelopeHashes, string(messageID))
435+
delete(m.messageEnvelopeHashes, types.HexBytes(messageID).String())
436436
}
437437
}

protocol/transport/transport.go

+19
Original file line numberDiff line numberDiff line change
@@ -717,3 +717,22 @@ func (t *Transport) RemovePubsubTopicKey(topic string) error {
717717
}
718718
return nil
719719
}
720+
721+
func (t *Transport) ConfirmMessageDelivered(messageID string) {
722+
if t.envelopesMonitor == nil {
723+
return
724+
}
725+
hashes, ok := t.envelopesMonitor.messageEnvelopeHashes[messageID]
726+
if !ok {
727+
return
728+
}
729+
commHashes := make([]common.Hash, len(hashes))
730+
for i, h := range hashes {
731+
commHashes[i] = common.BytesToHash(h[:])
732+
}
733+
t.waku.ConfirmMessageDelivered(commHashes)
734+
}
735+
736+
func (t *Transport) SetStorePeerID(peerID peer.ID) {
737+
t.waku.SetStorePeerID(peerID)
738+
}

wakuv2/common/message.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ type MessageType int
2222
const (
2323
RelayedMessageType MessageType = iota
2424
StoreMessageType
25+
SendMessageType
2526
)
2627

2728
// MessageParams specifies the exact way a message should be wrapped
@@ -46,7 +47,7 @@ type ReceivedMessage struct {
4647
Padding []byte
4748
Signature []byte
4849

49-
Sent uint32 // Time when the message was posted into the network
50+
Sent uint32 // Time when the message was posted into the network in seconds
5051
Src *ecdsa.PublicKey // Message recipient (identity used to decode the message)
5152
Dst *ecdsa.PublicKey // Message recipient (identity used to decode the message)
5253

0 commit comments

Comments
 (0)