Skip to content

Commit d78064f

Browse files
committed
chore_: remove message ids from query queue after ack
1 parent 86a0f31 commit d78064f

File tree

6 files changed

+39
-0
lines changed

6 files changed

+39
-0
lines changed

eth-node/bridge/geth/waku.go

+3
Original file line numberDiff line numberDiff line change
@@ -314,3 +314,6 @@ func GetWakuFilterFrom(f types.Filter) *wakucommon.Filter {
314314
func (w *wakuFilterWrapper) ID() string {
315315
return w.id
316316
}
317+
318+
func (w *GethWakuWrapper) ConfirmMessageDelivered(hashes []common.Hash) {
319+
}

eth-node/bridge/geth/wakuv2.go

+4
Original file line numberDiff line numberDiff line change
@@ -327,3 +327,7 @@ func GetWakuV2FilterFrom(f types.Filter) *wakucommon.Filter {
327327
func (w *wakuV2FilterWrapper) ID() string {
328328
return w.id
329329
}
330+
331+
func (w *gethWakuV2Wrapper) ConfirmMessageDelivered(hashes []common.Hash) {
332+
w.waku.ConfirmMessageDelivered(hashes)
333+
}

eth-node/types/waku.go

+3
Original file line numberDiff line numberDiff line change
@@ -178,4 +178,7 @@ type Waku interface {
178178

179179
// ClearEnvelopesCache clears waku envelopes cache
180180
ClearEnvelopesCache()
181+
182+
// ConfirmMessageDelivered updates a message has been delivered in waku
183+
ConfirmMessageDelivered(hash []common.Hash)
181184
}

protocol/messenger_peersyncing.go

+2
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ func (m *Messenger) markDeliveredMessages(acks [][]byte) {
4343
m.logger.Debug("Can't set message status as delivered", zap.Error(err))
4444
}
4545

46+
m.transport.ConfirmMessageDelivered(messageID)
47+
4648
//send signal to client that message status updated
4749
if m.config.messengerSignalsHandler != nil {
4850
message, err := m.persistence.MessageByID(messageID)

protocol/transport/transport.go

+12
Original file line numberDiff line numberDiff line change
@@ -717,3 +717,15 @@ func (t *Transport) RemovePubsubTopicKey(topic string) error {
717717
}
718718
return nil
719719
}
720+
721+
func (t *Transport) ConfirmMessageDelivered(messageID string) {
722+
hashes, ok := t.envelopesMonitor.identifierHashes[messageID]
723+
if !ok {
724+
return
725+
}
726+
commHashes := make([]common.Hash, len(hashes))
727+
for i, h := range hashes {
728+
commHashes[i] = common.BytesToHash(h[:])
729+
}
730+
t.waku.ConfirmMessageDelivered(commHashes)
731+
}

wakuv2/waku.go

+15
Original file line numberDiff line numberDiff line change
@@ -1042,6 +1042,21 @@ func (w *Waku) checkIfMessagesStored() {
10421042
}
10431043
}
10441044

1045+
func (w *Waku) ConfirmMessageDelivered(hashes []gethcommon.Hash) {
1046+
w.sendMsgIDsMu.Lock()
1047+
defer w.sendMsgIDsMu.Unlock()
1048+
for pubsubTopic, subMsgs := range w.sendMsgIDs {
1049+
for _, hash := range hashes {
1050+
delete(subMsgs, hash)
1051+
if len(subMsgs) == 0 {
1052+
delete(w.sendMsgIDs, pubsubTopic)
1053+
} else {
1054+
w.sendMsgIDs[pubsubTopic] = subMsgs
1055+
}
1056+
}
1057+
}
1058+
}
1059+
10451060
type publishFn = func(envelope *protocol.Envelope, logger *zap.Logger) error
10461061

10471062
func (w *Waku) publishEnvelope(envelope *protocol.Envelope, publishFn publishFn, logger *zap.Logger) {

0 commit comments

Comments
 (0)