@@ -131,6 +131,8 @@ type Waku struct {
131
131
sendMsgIDs map [string ]map [gethcommon.Hash ]uint32
132
132
sendMsgIDsMu sync.RWMutex
133
133
134
+ storePeerID peer.ID
135
+
134
136
topicHealthStatusChan chan peermanager.TopicHealthStatus
135
137
connStatusSubscriptions map [string ]* types.ConnStatusSubscription
136
138
connStatusMu sync.Mutex
@@ -993,6 +995,9 @@ func (w *Waku) checkIfMessagesStored() {
993
995
for pubsubTopic , subMsgs := range w .sendMsgIDs {
994
996
var queryMsgIds []gethcommon.Hash
995
997
for msgID , sendTime := range subMsgs {
998
+ if len (queryMsgIds ) >= 20 {
999
+ break
1000
+ }
996
1001
// message is sent 5 seconds ago, check if it's stored
997
1002
if uint32 (w .timesource .Now ().Unix ()) > sendTime + 5 {
998
1003
queryMsgIds = append (queryMsgIds , msgID )
@@ -1061,35 +1066,43 @@ func (w *Waku) Send(pubsubTopic string, msg *pb.WakuMessage) ([]byte, error) {
1061
1066
1062
1067
// ctx, peer, r.PubsubTopic, contentTopics, uint64(r.From), uint64(r.To), options, processEnvelopes
1063
1068
func (w * Waku ) messageHashBasedQuery (ctx context.Context , hashes []gethcommon.Hash , pubsubTopic string ) {
1064
- selectedPeers , err := w .node .PeerManager ().SelectPeers (
1065
- peermanager.PeerSelectionCriteria {
1066
- SelectionType : peermanager .Automatic ,
1067
- Proto : store .StoreQueryID_v300 ,
1068
- PubsubTopics : []string {pubsubTopic },
1069
- Ctx : ctx ,
1070
- },
1071
- )
1072
- if err != nil {
1073
- w .logger .Warn ("could not select peers" , zap .Error (err ))
1074
- return
1069
+ selectedPeer := w .storePeerID
1070
+ if selectedPeer == "" {
1071
+ selectedPeers , err := w .node .PeerManager ().SelectPeers (
1072
+ peermanager.PeerSelectionCriteria {
1073
+ SelectionType : peermanager .Automatic ,
1074
+ Proto : store .StoreQueryID_v300 ,
1075
+ PubsubTopics : []string {pubsubTopic },
1076
+ Ctx : ctx ,
1077
+ },
1078
+ )
1079
+ if err != nil {
1080
+ w .logger .Error ("could not select peers" , zap .Error (err ))
1081
+ return
1082
+ }
1083
+ selectedPeer = selectedPeers [0 ]
1075
1084
}
1076
1085
1077
1086
var opts []store.RequestOption
1078
1087
requestID := protocol .GenerateRequestID ()
1079
1088
opts = append (opts , store .WithRequestID (requestID ))
1080
- opts = append (opts , store .WithPeer (selectedPeers [ 0 ] ))
1089
+ opts = append (opts , store .WithPeer (selectedPeer ))
1081
1090
1082
1091
messageHashes := make ([]pb.MessageHash , len (hashes ))
1083
1092
for i , hash := range hashes {
1084
1093
messageHashes [i ] = pb .ToMessageHash (hash .Bytes ())
1085
1094
}
1086
1095
1096
+ w .logger .Debug ("store.queryByHash request" , zap .String ("requestID" , hexutil .Encode (requestID )), zap .String ("peerID" , selectedPeer .String ()), zap .Any ("messageHashes" , messageHashes ))
1097
+
1087
1098
result , err := w .node .Store ().QueryByHash (ctx , messageHashes , opts ... )
1088
1099
if err != nil {
1089
- w .logger .Warn ("store.queryByHash failed" , zap .String ("requestID" , hexutil .Encode (requestID )), zap .Error (err ))
1100
+ w .logger .Error ("store.queryByHash failed" , zap .String ("requestID" , hexutil .Encode (requestID )), zap . String ( "peerID" , selectedPeer . String ( )), zap .Error (err ))
1090
1101
return
1091
1102
}
1092
1103
1104
+ w .logger .Debug ("store.queryByHash result" , zap .String ("requestID" , hexutil .Encode (requestID )), zap .Int ("messages" , len (result .Messages ())))
1105
+
1093
1106
var ackHashes []gethcommon.Hash
1094
1107
var missedHashes []gethcommon.Hash
1095
1108
for _ , hash := range hashes {
@@ -1100,10 +1113,19 @@ func (w *Waku) messageHashBasedQuery(ctx context.Context, hashes []gethcommon.Ha
1100
1113
break
1101
1114
}
1102
1115
}
1116
+
1103
1117
if found {
1104
1118
ackHashes = append (ackHashes , hash )
1119
+ w .SendEnvelopeEvent (common.EnvelopeEvent {
1120
+ Hash : hash ,
1121
+ Event : common .EventEnvelopeSent ,
1122
+ })
1105
1123
} else {
1106
1124
missedHashes = append (missedHashes , hash )
1125
+ w .SendEnvelopeEvent (common.EnvelopeEvent {
1126
+ Hash : hash ,
1127
+ Event : common .EventEnvelopeExpired ,
1128
+ })
1107
1129
}
1108
1130
1109
1131
subMsgs := w .sendMsgIDs [pubsubTopic ]
@@ -1117,20 +1139,6 @@ func (w *Waku) messageHashBasedQuery(ctx context.Context, hashes []gethcommon.Ha
1117
1139
1118
1140
w .logger .Debug ("Ack message hashes" , zap .Any ("ackHashes" , ackHashes ))
1119
1141
w .logger .Debug ("Missed message hashes" , zap .Any ("missedHashes" , missedHashes ))
1120
-
1121
- for _ , hash := range ackHashes {
1122
- w .SendEnvelopeEvent (common.EnvelopeEvent {
1123
- Hash : hash ,
1124
- Event : common .EventEnvelopeSent ,
1125
- })
1126
- }
1127
-
1128
- for _ , hash := range missedHashes {
1129
- w .SendEnvelopeEvent (common.EnvelopeEvent {
1130
- Hash : hash ,
1131
- Event : common .EventEnvelopeExpired ,
1132
- })
1133
- }
1134
1142
}
1135
1143
1136
1144
func (w * Waku ) query (ctx context.Context , peerID peer.ID , pubsubTopic string , topics []common.TopicType , from uint64 , to uint64 , requestID []byte , opts []legacy_store.HistoryRequestOption ) (* legacy_store.Result , error ) {
@@ -1165,6 +1173,7 @@ func (w *Waku) query(ctx context.Context, peerID peer.ID, pubsubTopic string, to
1165
1173
}
1166
1174
1167
1175
func (w * Waku ) Query (ctx context.Context , peerID peer.ID , pubsubTopic string , topics []common.TopicType , from uint64 , to uint64 , opts []legacy_store.HistoryRequestOption , processEnvelopes bool ) (cursor * storepb.Index , envelopesCount int , err error ) {
1176
+ w .storePeerID = peerID
1168
1177
requestID := protocol .GenerateRequestID ()
1169
1178
pubsubTopic = w .getPubsubTopic (pubsubTopic )
1170
1179
0 commit comments