19
19
package wakuv2
20
20
21
21
import (
22
+ "bytes"
22
23
"context"
23
24
"crypto/ecdsa"
24
25
"crypto/sha256"
@@ -65,6 +66,7 @@ import (
65
66
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
66
67
"github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange"
67
68
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
69
+ "github.com/waku-org/go-waku/waku/v2/protocol/store"
68
70
69
71
"github.com/status-im/status-go/connection"
70
72
"github.com/status-im/status-go/eth-node/types"
@@ -126,6 +128,9 @@ type Waku struct {
126
128
storeMsgIDs map [gethcommon.Hash ]bool // Map of the currently processing ids
127
129
storeMsgIDsMu sync.RWMutex
128
130
131
+ sendMsgIDs map [string ]map [gethcommon.Hash ]uint32
132
+ sendMsgIDsMu sync.RWMutex
133
+
129
134
topicHealthStatusChan chan peermanager.TopicHealthStatus
130
135
connStatusSubscriptions map [string ]* types.ConnStatusSubscription
131
136
connStatusMu sync.Mutex
@@ -206,6 +211,8 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, logger *zap.Logge
206
211
storeMsgIDs : make (map [gethcommon.Hash ]bool ),
207
212
timesource : ts ,
208
213
storeMsgIDsMu : sync.RWMutex {},
214
+ sendMsgIDs : make (map [string ]map [gethcommon.Hash ]uint32 ),
215
+ sendMsgIDsMu : sync.RWMutex {},
209
216
logger : logger ,
210
217
discV5BootstrapNodes : cfg .DiscV5BootstrapNodes ,
211
218
onHistoricMessagesRequestFailed : onHistoricMessagesRequestFailed ,
@@ -970,6 +977,38 @@ func (w *Waku) broadcast() {
970
977
}
971
978
}
972
979
980
+ func (w * Waku ) checkIfMessagesStored () {
981
+ ticker := time .NewTicker (time .Second * 5 )
982
+ defer ticker .Stop ()
983
+
984
+ for {
985
+ select {
986
+ case <- w .ctx .Done ():
987
+ w .logger .Debug ("Stop the look for message stored check" )
988
+ return
989
+ case <- ticker .C :
990
+ w .logger .Debug ("Running loop for messages stored check" )
991
+ w .logger .Debug ("Send Message IDs" , zap .Any ("sendMsgIDs" , w .sendMsgIDs ))
992
+ w .sendMsgIDsMu .Lock ()
993
+ for pubsubTopic , subMsgs := range w .sendMsgIDs {
994
+ var queryMsgIds []gethcommon.Hash
995
+ for msgID , sendTime := range subMsgs {
996
+ // message is sent 5 seconds ago, check if it's stored
997
+ if uint32 (w .timesource .Now ().Unix ()) > sendTime + 5 {
998
+ queryMsgIds = append (queryMsgIds , msgID )
999
+ }
1000
+ }
1001
+ w .logger .Debug ("Store query for message hashes" , zap .Any ("queryMsgIds" , queryMsgIds ), zap .String ("pubsubTopic" , pubsubTopic ))
1002
+ if len (queryMsgIds ) > 0 {
1003
+ w .messageHashBasedQuery (w .ctx , queryMsgIds , pubsubTopic )
1004
+ }
1005
+ }
1006
+
1007
+ w .sendMsgIDsMu .Unlock ()
1008
+ }
1009
+ }
1010
+ }
1011
+
973
1012
type publishFn = func (envelope * protocol.Envelope , logger * zap.Logger ) error
974
1013
975
1014
func (w * Waku ) publishEnvelope (envelope * protocol.Envelope , publishFn publishFn , logger * zap.Logger ) {
@@ -979,14 +1018,11 @@ func (w *Waku) publishEnvelope(envelope *protocol.Envelope, publishFn publishFn,
979
1018
if err := publishFn (envelope , logger ); err != nil {
980
1019
logger .Error ("could not send message" , zap .Error (err ))
981
1020
event = common .EventEnvelopeExpired
982
- } else {
983
- event = common .EventEnvelopeSent
1021
+ w .SendEnvelopeEvent (common.EnvelopeEvent {
1022
+ Hash : gethcommon .BytesToHash (envelope .Hash ().Bytes ()),
1023
+ Event : event ,
1024
+ })
984
1025
}
985
-
986
- w .SendEnvelopeEvent (common.EnvelopeEvent {
987
- Hash : gethcommon .BytesToHash (envelope .Hash ().Bytes ()),
988
- Event : event ,
989
- })
990
1026
}
991
1027
992
1028
// Send injects a message into the waku send queue, to be distributed in the
@@ -1015,14 +1051,88 @@ func (w *Waku) Send(pubsubTopic string, msg *pb.WakuMessage) ([]byte, error) {
1015
1051
alreadyCached := w .envelopeCache .Has (gethcommon .BytesToHash (envelope .Hash ().Bytes ()))
1016
1052
w .poolMu .Unlock ()
1017
1053
if ! alreadyCached {
1018
- recvMessage := common .NewReceivedMessage (envelope , common .RelayedMessageType )
1054
+ recvMessage := common .NewReceivedMessage (envelope , common .SendMessageType )
1019
1055
w .postEvent (recvMessage ) // notify the local node about the new message
1020
1056
w .addEnvelope (recvMessage )
1021
1057
}
1022
1058
1023
1059
return envelope .Hash ().Bytes (), nil
1024
1060
}
1025
1061
1062
+ // ctx, peer, r.PubsubTopic, contentTopics, uint64(r.From), uint64(r.To), options, processEnvelopes
1063
+ func (w * Waku ) messageHashBasedQuery (ctx context.Context , hashes []gethcommon.Hash , pubsubTopic string ) {
1064
+ selectedPeers , err := w .node .PeerManager ().SelectPeers (
1065
+ peermanager.PeerSelectionCriteria {
1066
+ SelectionType : peermanager .Automatic ,
1067
+ Proto : store .StoreQueryID_v300 ,
1068
+ PubsubTopics : []string {pubsubTopic },
1069
+ Ctx : ctx ,
1070
+ },
1071
+ )
1072
+ if err != nil {
1073
+ w .logger .Warn ("could not select peers" , zap .Error (err ))
1074
+ return
1075
+ }
1076
+
1077
+ var opts []store.RequestOption
1078
+ requestID := protocol .GenerateRequestID ()
1079
+ opts = append (opts , store .WithRequestID (requestID ))
1080
+ opts = append (opts , store .WithPeer (selectedPeers [0 ]))
1081
+
1082
+ messageHashes := make ([]pb.MessageHash , len (hashes ))
1083
+ for i , hash := range hashes {
1084
+ messageHashes [i ] = pb .ToMessageHash (hash .Bytes ())
1085
+ }
1086
+
1087
+ result , err := w .node .Store ().QueryByHash (ctx , messageHashes , opts ... )
1088
+ if err != nil {
1089
+ w .logger .Warn ("store.queryByHash failed" , zap .String ("requestID" , hexutil .Encode (requestID )), zap .Error (err ))
1090
+ return
1091
+ }
1092
+
1093
+ var ackHashes []gethcommon.Hash
1094
+ var missedHashes []gethcommon.Hash
1095
+ for _ , hash := range hashes {
1096
+ found := false
1097
+ for _ , msg := range result .Messages () {
1098
+ if bytes .Equal (msg .GetMessageHash (), hash .Bytes ()) {
1099
+ found = true
1100
+ break
1101
+ }
1102
+ }
1103
+ if found {
1104
+ ackHashes = append (ackHashes , hash )
1105
+ } else {
1106
+ missedHashes = append (missedHashes , hash )
1107
+ }
1108
+
1109
+ subMsgs := w .sendMsgIDs [pubsubTopic ]
1110
+ delete (subMsgs , hash )
1111
+ if len (subMsgs ) == 0 {
1112
+ delete (w .sendMsgIDs , pubsubTopic )
1113
+ } else {
1114
+ w .sendMsgIDs [pubsubTopic ] = subMsgs
1115
+ }
1116
+ }
1117
+
1118
+ w .logger .Debug ("Ack message hashes" , zap .Any ("ackHashes" , ackHashes ))
1119
+ w .logger .Debug ("Missed message hashes" , zap .Any ("missedHashes" , missedHashes ))
1120
+
1121
+ for _ , hash := range ackHashes {
1122
+ w .SendEnvelopeEvent (common.EnvelopeEvent {
1123
+ Hash : hash ,
1124
+ Event : common .EventEnvelopeSent ,
1125
+ })
1126
+ }
1127
+
1128
+ for _ , hash := range missedHashes {
1129
+ w .SendEnvelopeEvent (common.EnvelopeEvent {
1130
+ Hash : hash ,
1131
+ Event : common .EventEnvelopeExpired ,
1132
+ })
1133
+ }
1134
+ }
1135
+
1026
1136
func (w * Waku ) query (ctx context.Context , peerID peer.ID , pubsubTopic string , topics []common.TopicType , from uint64 , to uint64 , requestID []byte , opts []legacy_store.HistoryRequestOption ) (* legacy_store.Result , error ) {
1027
1137
1028
1138
if len (requestID ) != 0 {
@@ -1203,6 +1313,8 @@ func (w *Waku) Start() error {
1203
1313
1204
1314
go w .broadcast ()
1205
1315
1316
+ go w .checkIfMessagesStored ()
1317
+
1206
1318
// we should wait `seedBootnodesForDiscV5` shutdown smoothly before set w.ctx to nil within `w.Stop()`
1207
1319
w .wg .Add (1 )
1208
1320
go w .seedBootnodesForDiscV5 ()
@@ -1382,6 +1494,17 @@ func (w *Waku) processReceivedMessage(e *common.ReceivedMessage) {
1382
1494
w .storeMsgIDsMu .Unlock ()
1383
1495
}
1384
1496
1497
+ if e .MsgType == common .SendMessageType && ! (* e .Envelope .Message ().Ephemeral ) {
1498
+ w .sendMsgIDsMu .Lock ()
1499
+ subMsgs , ok := w .sendMsgIDs [e .PubsubTopic ]
1500
+ if ! ok {
1501
+ subMsgs = make (map [gethcommon.Hash ]uint32 )
1502
+ }
1503
+ subMsgs [e .Hash ()] = e .Sent
1504
+ w .sendMsgIDs [e .PubsubTopic ] = subMsgs
1505
+ w .sendMsgIDsMu .Unlock ()
1506
+ }
1507
+
1385
1508
matched := w .filters .NotifyWatchers (e )
1386
1509
1387
1510
// If not matched we remove it
0 commit comments