Skip to content

Commit 9348690

Browse files
committed
refactor: use protected topics persistence interface
iterates: #6792
1 parent a384854 commit 9348690

File tree

10 files changed

+156
-165
lines changed

10 files changed

+156
-165
lines changed

messaging/adapters/persistence.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
package adapters
22

33
import (
4+
"crypto/ecdsa"
5+
46
"github.com/status-im/status-go/messaging/layers/transport"
57
"github.com/status-im/status-go/messaging/types"
8+
wakupersistence "github.com/status-im/status-go/messaging/waku/persistence"
69
)
710

811
type KeysPersistence struct {
@@ -37,3 +40,36 @@ func (pm *ProcessedMessageIDsCache) Add(ids []string, timestamp uint64) error {
3740
func (pm *ProcessedMessageIDsCache) Clean(timestamp uint64) error {
3841
return pm.P.MessageCacheClearOlderThan(timestamp)
3942
}
43+
44+
type WakuProtectedTopics struct {
45+
P types.Persistence
46+
}
47+
48+
var _ wakupersistence.ProtectedTopics = (*WakuProtectedTopics)(nil)
49+
50+
func (wpt *WakuProtectedTopics) Insert(pubsubTopic string, privKey *ecdsa.PrivateKey, publicKey *ecdsa.PublicKey) error {
51+
return wpt.P.WakuInsertProtectedTopic(pubsubTopic, privKey, publicKey)
52+
}
53+
54+
func (wpt *WakuProtectedTopics) Delete(pubsubTopic string) error {
55+
return wpt.P.WakuDeleteProtectedTopic(pubsubTopic)
56+
}
57+
58+
func (wpt *WakuProtectedTopics) FetchPrivateKey(topic string) (*ecdsa.PrivateKey, error) {
59+
return wpt.P.WakuFetchPrivateKeyForProtectedTopic(topic)
60+
}
61+
62+
func (wpt *WakuProtectedTopics) ProtectedTopics() ([]wakupersistence.ProtectedTopic, error) {
63+
pt, err := wpt.P.WakuProtectedTopics()
64+
if err != nil {
65+
return nil, err
66+
}
67+
result := make([]wakupersistence.ProtectedTopic, len(pt))
68+
for i, p := range pt {
69+
result[i] = wakupersistence.ProtectedTopic{
70+
PubKey: p.PubKey,
71+
Topic: p.Topic,
72+
}
73+
}
74+
return result, nil
75+
}

messaging/common/message_sender_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func (s *MessageSenderSuite) SetupTest() {
7777
nil,
7878
&wakuConfig,
7979
s.logger,
80-
database,
80+
nil,
8181
nil,
8282
func([]byte, peer.AddrInfo, error) {},
8383
nil,

messaging/common/persistence_stub_test.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ type StubPersistence struct {
2929
completedSegments map[string]struct{} // hash
3030
}
3131

32+
var _ types.Persistence = (*StubPersistence)(nil)
33+
3234
func NewStubPersistence() *StubPersistence {
3335
return &StubPersistence{
3436
wakuKeys: make(map[string][]byte),
@@ -244,21 +246,33 @@ func (s *StubPersistence) CompleteMessageSegments(hash []byte, sigPubKey *ecdsa.
244246
}
245247

246248
func (s *StubPersistence) DeleteHashRatchetMessagesOlderThan(timestamp int64) error {
247-
// Not implemented for stub
248249
return nil
249250
}
250251

251252
func (s *StubPersistence) InsertPendingConfirmation(*types.RawMessageConfirmation) error {
252-
// Not implemented for stub
253253
return nil
254254
}
255255

256256
func (s *StubPersistence) RemoveMessageSegmentsOlderThan(timestamp int64) error {
257-
// Not implemented for stub
258257
return nil
259258
}
260259

261260
func (s *StubPersistence) RemoveMessageSegmentsCompletedOlderThan(timestamp int64) error {
262-
// Not implemented for stub
263261
return nil
264262
}
263+
264+
func (s *StubPersistence) WakuInsertProtectedTopic(pubsubTopic string, privKey *ecdsa.PrivateKey, publicKey *ecdsa.PublicKey) error {
265+
return nil
266+
}
267+
268+
func (s *StubPersistence) WakuDeleteProtectedTopic(pubsubTopic string) error {
269+
return nil
270+
}
271+
272+
func (s *StubPersistence) WakuFetchPrivateKeyForProtectedTopic(topic string) (*ecdsa.PrivateKey, error) {
273+
return nil, nil
274+
}
275+
276+
func (s *StubPersistence) WakuProtectedTopics() ([]types.ProtectedTopic, error) {
277+
return nil, nil
278+
}

messaging/core.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ func NewCore(params CoreParams, options ...Options) (*Core, error) {
125125
config := newConfig(options...)
126126

127127
waku, err := newWaku(wakuParams{
128-
db: params.DB,
128+
persistence: params.Persistence,
129129
identity: params.Identity,
130130
nodeKey: params.NodeKey,
131131
wakuConfig: params.WakuConfig,
@@ -369,7 +369,7 @@ func (c *Core) startCleanupLoop(name string, cleanupFunc func() error) {
369369
}
370370

371371
type wakuParams struct {
372-
db *sql.DB
372+
persistence types.Persistence
373373

374374
identity *ecdsa.PrivateKey
375375
nodeKey *ecdsa.PrivateKey
@@ -425,7 +425,7 @@ func newWaku(params wakuParams) (*wakuv2.Waku, error) {
425425
params.nodeKey,
426426
cfg,
427427
params.logger,
428-
params.db,
428+
&adapters.WakuProtectedTopics{P: params.persistence},
429429
params.timeSource,
430430
params.onHistoricMessagesRequestFailed,
431431
params.onPeerStats,

messaging/types/persistence.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@ package types
33
import "crypto/ecdsa"
44

55
type Persistence interface {
6-
WakuKeys() (map[string][]byte, error)
7-
AddWakuKey(chatID string, key []byte) error
6+
wakuPersistence
87

98
MessageCacheAdd(ids []string, timestamp uint64) error
109
MessageCacheClear() error
@@ -24,3 +23,17 @@ type Persistence interface {
2423
RemoveMessageSegmentsOlderThan(timestamp int64) error
2524
RemoveMessageSegmentsCompletedOlderThan(timestamp int64) error
2625
}
26+
27+
type ProtectedTopic struct {
28+
PubKey *ecdsa.PublicKey
29+
Topic string
30+
}
31+
32+
type wakuPersistence interface {
33+
WakuKeys() (map[string][]byte, error)
34+
AddWakuKey(chatID string, key []byte) error
35+
WakuInsertProtectedTopic(pubsubTopic string, privKey *ecdsa.PrivateKey, publicKey *ecdsa.PublicKey) error
36+
WakuDeleteProtectedTopic(pubsubTopic string) error
37+
WakuFetchPrivateKeyForProtectedTopic(topic string) (*ecdsa.PrivateKey, error)
38+
WakuProtectedTopics() ([]ProtectedTopic, error)
39+
}

messaging/waku/gowaku.go

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ type Waku struct {
154154

155155
bandwidthCounter *metrics.BandwidthCounter
156156

157-
protectedTopicStore *persistence.ProtectedTopicsStore
157+
protectedTopicStore persistence.ProtectedTopics
158158

159159
sendQueue *publish.MessageQueue
160160

@@ -221,7 +221,7 @@ func newTTLCache() *ttlcache.Cache[gethcommon.Hash, bool] {
221221
}
222222

223223
// New creates a WakuV2 client ready to communicate through the LibP2P network.
224-
func New(nodeKey *ecdsa.PrivateKey, cfg *Config, logger *zap.Logger, appDB *sql.DB, ts timesource.TimeSource, onHistoricMessagesRequestFailed func([]byte, peer.AddrInfo, error), onPeerStats func(types.ConnStatus)) (*Waku, error) {
224+
func New(nodeKey *ecdsa.PrivateKey, cfg *Config, logger *zap.Logger, protectedTopicsPersistence persistence.ProtectedTopics, ts timesource.TimeSource, onHistoricMessagesRequestFailed func([]byte, peer.AddrInfo, error), onPeerStats func(types.ConnStatus)) (*Waku, error) {
225225
var err error
226226
if logger == nil {
227227
logger, err = zap.NewDevelopment()
@@ -244,7 +244,6 @@ func New(nodeKey *ecdsa.PrivateKey, cfg *Config, logger *zap.Logger, appDB *sql.
244244
ctx, cancel := context.WithCancel(context.Background())
245245

246246
waku := &Waku{
247-
appDB: appDB,
248247
cfg: cfg,
249248
privateKeys: make(map[string]*ecdsa.PrivateKey),
250249
symKeys: make(map[string][]byte),
@@ -267,6 +266,7 @@ func New(nodeKey *ecdsa.PrivateKey, cfg *Config, logger *zap.Logger, appDB *sql.
267266
onPeerStats: onPeerStats,
268267
onlineChecker: onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker),
269268
sendQueue: publish.NewMessageQueue(1000, cfg.UseThrottledPublish),
269+
protectedTopicStore: protectedTopicsPersistence,
270270
}
271271

272272
waku.filters = common.NewFilters(waku.cfg.DefaultShardPubsubTopic, waku.logger)
@@ -348,13 +348,6 @@ func New(nodeKey *ecdsa.PrivateKey, cfg *Config, logger *zap.Logger, appDB *sql.
348348
opts = append(opts, node.WithLightPush(lightpush.WithRateLimiter(5, 10)))
349349
}
350350

351-
if appDB != nil {
352-
waku.protectedTopicStore, err = persistence.NewProtectedTopicsStore(logger, appDB)
353-
if err != nil {
354-
return nil, err
355-
}
356-
}
357-
358351
if cfg.EnablePeerExchangeServer {
359352
opts = append(opts, node.WithPeerExchange(peer_exchange.WithRateLimiter(1, 1)))
360353
}
@@ -1399,13 +1392,6 @@ func (w *Waku) Stop() error {
13991392

14001393
w.node.Stop()
14011394

1402-
if w.protectedTopicStore != nil {
1403-
err := w.protectedTopicStore.Close()
1404-
if err != nil {
1405-
return err
1406-
}
1407-
}
1408-
14091395
close(w.goingOnline)
14101396
w.wg.Wait()
14111397

messaging/waku/nwaku.go

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"context"
1010
"crypto/ecdsa"
1111
"crypto/sha256"
12-
"database/sql"
1312
"encoding/hex"
1413
"errors"
1514
"fmt"
@@ -108,8 +107,6 @@ type IMetricsHandler interface {
108107
type Waku struct {
109108
node *waku.WakuNode
110109

111-
appDB *sql.DB
112-
113110
dnsAddressCache map[string][]dnsdisc.DiscoveredNode // Map to store the multiaddresses returned by dns discovery
114111
dnsAddressCacheLock *sync.RWMutex // lock to handle access to the map
115112
dnsDiscAsyncRetrievedSignal chan struct{}
@@ -127,7 +124,7 @@ type Waku struct {
127124

128125
bandwidthCounter *metrics.BandwidthCounter
129126

130-
protectedTopicStore *persistence.ProtectedTopicsStore
127+
protectedTopicStore persistence.ProtectedTopics
131128

132129
sendQueue *publish.MessageQueue
133130

@@ -207,7 +204,7 @@ func newTTLCache() *ttlcache.Cache[gethcommon.Hash, bool] {
207204
}
208205

209206
// New creates a WakuV2 client ready to communicate through the LibP2P network.
210-
func New(nodeKey *ecdsa.PrivateKey, cfg *Config, logger *zap.Logger, appDB *sql.DB, ts timesource.TimeSource, onHistoricMessagesRequestFailed func([]byte, peer.AddrInfo, error), onPeerStats func(types.ConnStatus)) (*Waku, error) {
207+
func New(nodeKey *ecdsa.PrivateKey, cfg *Config, logger *zap.Logger, protectedTopicsPersistence persistence.ProtectedTopics, ts timesource.TimeSource, onHistoricMessagesRequestFailed func([]byte, peer.AddrInfo, error), onPeerStats func(types.ConnStatus)) (*Waku, error) {
211208
var err error
212209
if logger == nil {
213210
logger, err = zap.NewDevelopment()
@@ -257,7 +254,6 @@ func New(nodeKey *ecdsa.PrivateKey, cfg *Config, logger *zap.Logger, appDB *sql.
257254

258255
waku := &Waku{
259256
node: wakunode,
260-
appDB: appDB,
261257
cfg: cfg,
262258
privateKeys: make(map[string]*ecdsa.PrivateKey),
263259
symKeys: make(map[string][]byte),
@@ -281,6 +277,7 @@ func New(nodeKey *ecdsa.PrivateKey, cfg *Config, logger *zap.Logger, appDB *sql.
281277
onPeerStats: onPeerStats,
282278
onlineChecker: onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker),
283279
sendQueue: publish.NewMessageQueue(1000, cfg.UseThrottledPublish),
280+
protectedTopicStore: protectedTopicsPersistence,
284281
}
285282

286283
waku.filters = common.NewFilters(waku.cfg.DefaultShardPubsubTopic, waku.logger)
@@ -1072,13 +1069,6 @@ func (w *Waku) Stop() error {
10721069
return err
10731070
}
10741071

1075-
if w.protectedTopicStore != nil {
1076-
err := w.protectedTopicStore.Close()
1077-
if err != nil {
1078-
return err
1079-
}
1080-
}
1081-
10821072
close(w.goingOnline)
10831073

10841074
w.wg.Wait()
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package persistence
2+
3+
import (
4+
"crypto/ecdsa"
5+
)
6+
7+
type ProtectedTopics interface {
8+
Insert(pubsubTopic string, privKey *ecdsa.PrivateKey, publicKey *ecdsa.PublicKey) error
9+
Delete(pubsubTopic string) error
10+
FetchPrivateKey(topic string) (*ecdsa.PrivateKey, error)
11+
ProtectedTopics() ([]ProtectedTopic, error)
12+
}
13+
14+
type ProtectedTopic struct {
15+
PubKey *ecdsa.PublicKey
16+
Topic string
17+
}

0 commit comments

Comments
 (0)