Skip to content

Commit 6b56b45

Browse files
committed
feat_: hash based query for outgoing messages.
1 parent 133ad09 commit 6b56b45

File tree

3 files changed

+134
-10
lines changed

3 files changed

+134
-10
lines changed

protocol/messenger_peers.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ func (m *Messenger) AddStorePeer(address string) (peer.ID, error) {
1313
}
1414

1515
func (m *Messenger) AddRelayPeer(address string) (peer.ID, error) {
16-
return m.transport.AddStorePeer(address)
16+
return m.transport.AddRelayPeer(address)
1717
}
1818

1919
func (m *Messenger) DialPeer(address string) error {

wakuv2/common/message.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ type MessageType int
2222
const (
2323
RelayedMessageType MessageType = iota
2424
StoreMessageType
25+
SendMessageType
2526
)
2627

2728
// MessageParams specifies the exact way a message should be wrapped
@@ -46,7 +47,7 @@ type ReceivedMessage struct {
4647
Padding []byte
4748
Signature []byte
4849

49-
Sent uint32 // Time when the message was posted into the network
50+
Sent uint32 // Time when the message was posted into the network in seconds
5051
Src *ecdsa.PublicKey // Message recipient (identity used to decode the message)
5152
Dst *ecdsa.PublicKey // Message recipient (identity used to decode the message)
5253

wakuv2/waku.go

+131-8
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package wakuv2
2020

2121
import (
22+
"bytes"
2223
"context"
2324
"crypto/ecdsa"
2425
"crypto/sha256"
@@ -66,6 +67,7 @@ import (
6667
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
6768
"github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange"
6869
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
70+
"github.com/waku-org/go-waku/waku/v2/protocol/store"
6971

7072
"github.com/status-im/status-go/connection"
7173
"github.com/status-im/status-go/eth-node/types"
@@ -128,6 +130,9 @@ type Waku struct {
128130
storeMsgIDs map[gethcommon.Hash]bool // Map of the currently processing ids
129131
storeMsgIDsMu sync.RWMutex
130132

133+
sendMsgIDs map[string]map[gethcommon.Hash]uint32
134+
sendMsgIDsMu sync.RWMutex
135+
131136
topicHealthStatusChan chan peermanager.TopicHealthStatus
132137
connStatusSubscriptions map[string]*types.ConnStatusSubscription
133138
connStatusMu sync.Mutex
@@ -208,6 +213,8 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, logger *zap.Logge
208213
storeMsgIDs: make(map[gethcommon.Hash]bool),
209214
timesource: ts,
210215
storeMsgIDsMu: sync.RWMutex{},
216+
sendMsgIDs: make(map[string]map[gethcommon.Hash]uint32),
217+
sendMsgIDsMu: sync.RWMutex{},
211218
logger: logger,
212219
discV5BootstrapNodes: cfg.DiscV5BootstrapNodes,
213220
onHistoricMessagesRequestFailed: onHistoricMessagesRequestFailed,
@@ -1007,6 +1014,38 @@ func (w *Waku) broadcast() {
10071014
}
10081015
}
10091016

1017+
func (w *Waku) checkIfMessagesStored() {
1018+
ticker := time.NewTicker(time.Second * 5)
1019+
defer ticker.Stop()
1020+
1021+
for {
1022+
select {
1023+
case <-w.ctx.Done():
1024+
w.logger.Debug("Stop the look for message stored check")
1025+
return
1026+
case <-ticker.C:
1027+
w.logger.Debug("Running loop for messages stored check")
1028+
w.logger.Debug("Send Message IDs", zap.Any("sendMsgIDs", w.sendMsgIDs))
1029+
w.sendMsgIDsMu.Lock()
1030+
for pubsubTopic, subMsgs := range w.sendMsgIDs {
1031+
var queryMsgIds []gethcommon.Hash
1032+
for msgID, sendTime := range subMsgs {
1033+
// message is sent 5 seconds ago, check if it's stored
1034+
if uint32(w.timesource.Now().Unix()) > sendTime+5 {
1035+
queryMsgIds = append(queryMsgIds, msgID)
1036+
}
1037+
}
1038+
w.logger.Debug("Store query for message hashes", zap.Any("queryMsgIds", queryMsgIds), zap.String("pubsubTopic", pubsubTopic))
1039+
if len(queryMsgIds) > 0 {
1040+
w.messageHashBasedQuery(w.ctx, queryMsgIds, pubsubTopic)
1041+
}
1042+
}
1043+
1044+
w.sendMsgIDsMu.Unlock()
1045+
}
1046+
}
1047+
}
1048+
10101049
type publishFn = func(envelope *protocol.Envelope, logger *zap.Logger) error
10111050

10121051
func (w *Waku) publishEnvelope(envelope *protocol.Envelope, publishFn publishFn, logger *zap.Logger) {
@@ -1016,14 +1055,11 @@ func (w *Waku) publishEnvelope(envelope *protocol.Envelope, publishFn publishFn,
10161055
if err := publishFn(envelope, logger); err != nil {
10171056
logger.Error("could not send message", zap.Error(err))
10181057
event = common.EventEnvelopeExpired
1019-
} else {
1020-
event = common.EventEnvelopeSent
1058+
w.SendEnvelopeEvent(common.EnvelopeEvent{
1059+
Hash: gethcommon.BytesToHash(envelope.Hash().Bytes()),
1060+
Event: event,
1061+
})
10211062
}
1022-
1023-
w.SendEnvelopeEvent(common.EnvelopeEvent{
1024-
Hash: gethcommon.BytesToHash(envelope.Hash().Bytes()),
1025-
Event: event,
1026-
})
10271063
}
10281064

10291065
// Send injects a message into the waku send queue, to be distributed in the
@@ -1052,14 +1088,88 @@ func (w *Waku) Send(pubsubTopic string, msg *pb.WakuMessage) ([]byte, error) {
10521088
alreadyCached := w.envelopeCache.Has(gethcommon.BytesToHash(envelope.Hash().Bytes()))
10531089
w.poolMu.Unlock()
10541090
if !alreadyCached {
1055-
recvMessage := common.NewReceivedMessage(envelope, common.RelayedMessageType)
1091+
recvMessage := common.NewReceivedMessage(envelope, common.SendMessageType)
10561092
w.postEvent(recvMessage) // notify the local node about the new message
10571093
w.addEnvelope(recvMessage)
10581094
}
10591095

10601096
return envelope.Hash().Bytes(), nil
10611097
}
10621098

1099+
// ctx, peer, r.PubsubTopic, contentTopics, uint64(r.From), uint64(r.To), options, processEnvelopes
1100+
func (w *Waku) messageHashBasedQuery(ctx context.Context, hashes []gethcommon.Hash, pubsubTopic string) {
1101+
selectedPeers, err := w.node.PeerManager().SelectPeers(
1102+
peermanager.PeerSelectionCriteria{
1103+
SelectionType: peermanager.Automatic,
1104+
Proto: store.StoreQueryID_v300,
1105+
PubsubTopics: []string{pubsubTopic},
1106+
Ctx: ctx,
1107+
},
1108+
)
1109+
if err != nil {
1110+
w.logger.Warn("could not select peers", zap.Error(err))
1111+
return
1112+
}
1113+
1114+
var opts []store.RequestOption
1115+
requestID := protocol.GenerateRequestID()
1116+
opts = append(opts, store.WithRequestID(requestID))
1117+
opts = append(opts, store.WithPeer(selectedPeers[0]))
1118+
1119+
messageHashes := make([]pb.MessageHash, len(hashes))
1120+
for i, hash := range hashes {
1121+
messageHashes[i] = pb.ToMessageHash(hash.Bytes())
1122+
}
1123+
1124+
result, err := w.node.Store().QueryByHash(ctx, messageHashes, opts...)
1125+
if err != nil {
1126+
w.logger.Warn("store.queryByHash failed", zap.String("requestID", hexutil.Encode(requestID)), zap.Error(err))
1127+
return
1128+
}
1129+
1130+
var ackHashes []gethcommon.Hash
1131+
var missedHashes []gethcommon.Hash
1132+
for _, hash := range hashes {
1133+
found := false
1134+
for _, msg := range result.Messages() {
1135+
if bytes.Equal(msg.GetMessageHash(), hash.Bytes()) {
1136+
found = true
1137+
break
1138+
}
1139+
}
1140+
if found {
1141+
ackHashes = append(ackHashes, hash)
1142+
} else {
1143+
missedHashes = append(missedHashes, hash)
1144+
}
1145+
1146+
subMsgs := w.sendMsgIDs[pubsubTopic]
1147+
delete(subMsgs, hash)
1148+
if len(subMsgs) == 0 {
1149+
delete(w.sendMsgIDs, pubsubTopic)
1150+
} else {
1151+
w.sendMsgIDs[pubsubTopic] = subMsgs
1152+
}
1153+
}
1154+
1155+
w.logger.Debug("Ack message hashes", zap.Any("ackHashes", ackHashes))
1156+
w.logger.Debug("Missed message hashes", zap.Any("missedHashes", missedHashes))
1157+
1158+
for _, hash := range ackHashes {
1159+
w.SendEnvelopeEvent(common.EnvelopeEvent{
1160+
Hash: hash,
1161+
Event: common.EventEnvelopeSent,
1162+
})
1163+
}
1164+
1165+
for _, hash := range missedHashes {
1166+
w.SendEnvelopeEvent(common.EnvelopeEvent{
1167+
Hash: hash,
1168+
Event: common.EventEnvelopeExpired,
1169+
})
1170+
}
1171+
}
1172+
10631173
func (w *Waku) query(ctx context.Context, peerID peer.ID, pubsubTopic string, topics []common.TopicType, from uint64, to uint64, requestID []byte, opts []legacy_store.HistoryRequestOption) (*legacy_store.Result, error) {
10641174

10651175
opts = append(opts, legacy_store.WithRequestID(requestID))
@@ -1263,6 +1373,8 @@ func (w *Waku) Start() error {
12631373

12641374
go w.broadcast()
12651375

1376+
go w.checkIfMessagesStored()
1377+
12661378
// we should wait `seedBootnodesForDiscV5` shutdown smoothly before set w.ctx to nil within `w.Stop()`
12671379
w.wg.Add(1)
12681380
go w.seedBootnodesForDiscV5()
@@ -1447,6 +1559,17 @@ func (w *Waku) processReceivedMessage(e *common.ReceivedMessage) {
14471559
w.storeMsgIDsMu.Unlock()
14481560
}
14491561

1562+
if e.MsgType == common.SendMessageType && !(*e.Envelope.Message().Ephemeral) {
1563+
w.sendMsgIDsMu.Lock()
1564+
subMsgs, ok := w.sendMsgIDs[e.PubsubTopic]
1565+
if !ok {
1566+
subMsgs = make(map[gethcommon.Hash]uint32)
1567+
}
1568+
subMsgs[e.Hash()] = e.Sent
1569+
w.sendMsgIDs[e.PubsubTopic] = subMsgs
1570+
w.sendMsgIDsMu.Unlock()
1571+
}
1572+
14501573
matched := w.filters.NotifyWatchers(e)
14511574

14521575
// If not matched we remove it

0 commit comments

Comments
 (0)