diff --git a/messaging/adapters/persistence.go b/messaging/adapters/persistence.go index 30de9d78097..595d0f9977c 100644 --- a/messaging/adapters/persistence.go +++ b/messaging/adapters/persistence.go @@ -1,8 +1,11 @@ package adapters import ( + "crypto/ecdsa" + "github.com/status-im/status-go/messaging/layers/transport" "github.com/status-im/status-go/messaging/types" + wakupersistence "github.com/status-im/status-go/messaging/waku/persistence" ) type KeysPersistence struct { @@ -37,3 +40,36 @@ func (pm *ProcessedMessageIDsCache) Add(ids []string, timestamp uint64) error { func (pm *ProcessedMessageIDsCache) Clean(timestamp uint64) error { return pm.P.MessageCacheClearOlderThan(timestamp) } + +type WakuProtectedTopics struct { + P types.Persistence +} + +var _ wakupersistence.ProtectedTopics = (*WakuProtectedTopics)(nil) + +func (wpt *WakuProtectedTopics) Insert(pubsubTopic string, privKey *ecdsa.PrivateKey, publicKey *ecdsa.PublicKey) error { + return wpt.P.WakuInsertProtectedTopic(pubsubTopic, privKey, publicKey) +} + +func (wpt *WakuProtectedTopics) Delete(pubsubTopic string) error { + return wpt.P.WakuDeleteProtectedTopic(pubsubTopic) +} + +func (wpt *WakuProtectedTopics) FetchPrivateKey(topic string) (*ecdsa.PrivateKey, error) { + return wpt.P.WakuFetchPrivateKeyForProtectedTopic(topic) +} + +func (wpt *WakuProtectedTopics) ProtectedTopics() ([]wakupersistence.ProtectedTopic, error) { + pt, err := wpt.P.WakuProtectedTopics() + if err != nil { + return nil, err + } + result := make([]wakupersistence.ProtectedTopic, len(pt)) + for i, p := range pt { + result[i] = wakupersistence.ProtectedTopic{ + PubKey: p.PubKey, + Topic: p.Topic, + } + } + return result, nil +} diff --git a/messaging/common/message_sender_test.go b/messaging/common/message_sender_test.go index 814a91300b8..03c698701df 100644 --- a/messaging/common/message_sender_test.go +++ b/messaging/common/message_sender_test.go @@ -77,7 +77,7 @@ func (s *MessageSenderSuite) SetupTest() { nil, &wakuConfig, s.logger, - database, + nil, nil, func([]byte, peer.AddrInfo, error) {}, nil, diff --git a/messaging/common/persistence_stub_test.go b/messaging/common/persistence_stub_test.go index ead0d4dbfe9..f939ca3821d 100644 --- a/messaging/common/persistence_stub_test.go +++ b/messaging/common/persistence_stub_test.go @@ -29,6 +29,8 @@ type StubPersistence struct { completedSegments map[string]struct{} // hash } +var _ types.Persistence = (*StubPersistence)(nil) + func NewStubPersistence() *StubPersistence { return &StubPersistence{ wakuKeys: make(map[string][]byte), @@ -244,21 +246,33 @@ func (s *StubPersistence) CompleteMessageSegments(hash []byte, sigPubKey *ecdsa. } func (s *StubPersistence) DeleteHashRatchetMessagesOlderThan(timestamp int64) error { - // Not implemented for stub return nil } func (s *StubPersistence) InsertPendingConfirmation(*types.RawMessageConfirmation) error { - // Not implemented for stub return nil } func (s *StubPersistence) RemoveMessageSegmentsOlderThan(timestamp int64) error { - // Not implemented for stub return nil } func (s *StubPersistence) RemoveMessageSegmentsCompletedOlderThan(timestamp int64) error { - // Not implemented for stub return nil } + +func (s *StubPersistence) WakuInsertProtectedTopic(pubsubTopic string, privKey *ecdsa.PrivateKey, publicKey *ecdsa.PublicKey) error { + return nil +} + +func (s *StubPersistence) WakuDeleteProtectedTopic(pubsubTopic string) error { + return nil +} + +func (s *StubPersistence) WakuFetchPrivateKeyForProtectedTopic(topic string) (*ecdsa.PrivateKey, error) { + return nil, nil +} + +func (s *StubPersistence) WakuProtectedTopics() ([]types.ProtectedTopic, error) { + return nil, nil +} diff --git a/messaging/core.go b/messaging/core.go index afc5fc8ca30..572022523ab 100644 --- a/messaging/core.go +++ b/messaging/core.go @@ -125,7 +125,7 @@ func NewCore(params CoreParams, options ...Options) (*Core, error) { config := newConfig(options...) waku, err := newWaku(wakuParams{ - db: params.DB, + persistence: params.Persistence, identity: params.Identity, nodeKey: params.NodeKey, wakuConfig: params.WakuConfig, @@ -369,7 +369,7 @@ func (c *Core) startCleanupLoop(name string, cleanupFunc func() error) { } type wakuParams struct { - db *sql.DB + persistence types.Persistence identity *ecdsa.PrivateKey nodeKey *ecdsa.PrivateKey @@ -425,7 +425,7 @@ func newWaku(params wakuParams) (*wakuv2.Waku, error) { params.nodeKey, cfg, params.logger, - params.db, + &adapters.WakuProtectedTopics{P: params.persistence}, params.timeSource, params.onHistoricMessagesRequestFailed, params.onPeerStats, diff --git a/messaging/types/persistence.go b/messaging/types/persistence.go index f65bb26bb18..2d06b799d58 100644 --- a/messaging/types/persistence.go +++ b/messaging/types/persistence.go @@ -3,8 +3,7 @@ package types import "crypto/ecdsa" type Persistence interface { - WakuKeys() (map[string][]byte, error) - AddWakuKey(chatID string, key []byte) error + wakuPersistence MessageCacheAdd(ids []string, timestamp uint64) error MessageCacheClear() error @@ -24,3 +23,17 @@ type Persistence interface { RemoveMessageSegmentsOlderThan(timestamp int64) error RemoveMessageSegmentsCompletedOlderThan(timestamp int64) error } + +type ProtectedTopic struct { + PubKey *ecdsa.PublicKey + Topic string +} + +type wakuPersistence interface { + WakuKeys() (map[string][]byte, error) + AddWakuKey(chatID string, key []byte) error + WakuInsertProtectedTopic(pubsubTopic string, privKey *ecdsa.PrivateKey, publicKey *ecdsa.PublicKey) error + WakuDeleteProtectedTopic(pubsubTopic string) error + WakuFetchPrivateKeyForProtectedTopic(topic string) (*ecdsa.PrivateKey, error) + WakuProtectedTopics() ([]ProtectedTopic, error) +} diff --git a/messaging/waku/gowaku.go b/messaging/waku/gowaku.go index 7ed1f057786..53346e839e4 100644 --- a/messaging/waku/gowaku.go +++ b/messaging/waku/gowaku.go @@ -154,7 +154,7 @@ type Waku struct { bandwidthCounter *metrics.BandwidthCounter - protectedTopicStore *persistence.ProtectedTopicsStore + protectedTopicStore persistence.ProtectedTopics sendQueue *publish.MessageQueue @@ -221,7 +221,7 @@ func newTTLCache() *ttlcache.Cache[gethcommon.Hash, bool] { } // New creates a WakuV2 client ready to communicate through the LibP2P network. -func New(nodeKey *ecdsa.PrivateKey, cfg *Config, logger *zap.Logger, appDB *sql.DB, ts timesource.TimeSource, onHistoricMessagesRequestFailed func([]byte, peer.AddrInfo, error), onPeerStats func(types.ConnStatus)) (*Waku, error) { +func New(nodeKey *ecdsa.PrivateKey, cfg *Config, logger *zap.Logger, protectedTopicsPersistence persistence.ProtectedTopics, ts timesource.TimeSource, onHistoricMessagesRequestFailed func([]byte, peer.AddrInfo, error), onPeerStats func(types.ConnStatus)) (*Waku, error) { var err error if logger == nil { logger, err = zap.NewDevelopment() @@ -244,7 +244,6 @@ func New(nodeKey *ecdsa.PrivateKey, cfg *Config, logger *zap.Logger, appDB *sql. ctx, cancel := context.WithCancel(context.Background()) waku := &Waku{ - appDB: appDB, cfg: cfg, privateKeys: make(map[string]*ecdsa.PrivateKey), symKeys: make(map[string][]byte), @@ -267,6 +266,7 @@ func New(nodeKey *ecdsa.PrivateKey, cfg *Config, logger *zap.Logger, appDB *sql. onPeerStats: onPeerStats, onlineChecker: onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker), sendQueue: publish.NewMessageQueue(1000, cfg.UseThrottledPublish), + protectedTopicStore: protectedTopicsPersistence, } waku.filters = common.NewFilters(waku.cfg.DefaultShardPubsubTopic, waku.logger) @@ -348,13 +348,6 @@ func New(nodeKey *ecdsa.PrivateKey, cfg *Config, logger *zap.Logger, appDB *sql. opts = append(opts, node.WithLightPush(lightpush.WithRateLimiter(5, 10))) } - if appDB != nil { - waku.protectedTopicStore, err = persistence.NewProtectedTopicsStore(logger, appDB) - if err != nil { - return nil, err - } - } - if cfg.EnablePeerExchangeServer { opts = append(opts, node.WithPeerExchange(peer_exchange.WithRateLimiter(1, 1))) } @@ -1399,13 +1392,6 @@ func (w *Waku) Stop() error { w.node.Stop() - if w.protectedTopicStore != nil { - err := w.protectedTopicStore.Close() - if err != nil { - return err - } - } - close(w.goingOnline) w.wg.Wait() diff --git a/messaging/waku/nwaku.go b/messaging/waku/nwaku.go index 0987ad5baed..a607e41a36c 100644 --- a/messaging/waku/nwaku.go +++ b/messaging/waku/nwaku.go @@ -9,7 +9,6 @@ import ( "context" "crypto/ecdsa" "crypto/sha256" - "database/sql" "encoding/hex" "errors" "fmt" @@ -108,8 +107,6 @@ type IMetricsHandler interface { type Waku struct { node *waku.WakuNode - appDB *sql.DB - dnsAddressCache map[string][]dnsdisc.DiscoveredNode // Map to store the multiaddresses returned by dns discovery dnsAddressCacheLock *sync.RWMutex // lock to handle access to the map dnsDiscAsyncRetrievedSignal chan struct{} @@ -127,7 +124,7 @@ type Waku struct { bandwidthCounter *metrics.BandwidthCounter - protectedTopicStore *persistence.ProtectedTopicsStore + protectedTopicStore persistence.ProtectedTopics sendQueue *publish.MessageQueue @@ -207,7 +204,7 @@ func newTTLCache() *ttlcache.Cache[gethcommon.Hash, bool] { } // New creates a WakuV2 client ready to communicate through the LibP2P network. -func New(nodeKey *ecdsa.PrivateKey, cfg *Config, logger *zap.Logger, appDB *sql.DB, ts timesource.TimeSource, onHistoricMessagesRequestFailed func([]byte, peer.AddrInfo, error), onPeerStats func(types.ConnStatus)) (*Waku, error) { +func New(nodeKey *ecdsa.PrivateKey, cfg *Config, logger *zap.Logger, protectedTopicsPersistence persistence.ProtectedTopics, ts timesource.TimeSource, onHistoricMessagesRequestFailed func([]byte, peer.AddrInfo, error), onPeerStats func(types.ConnStatus)) (*Waku, error) { var err error if logger == nil { logger, err = zap.NewDevelopment() @@ -257,7 +254,6 @@ func New(nodeKey *ecdsa.PrivateKey, cfg *Config, logger *zap.Logger, appDB *sql. waku := &Waku{ node: wakunode, - appDB: appDB, cfg: cfg, privateKeys: make(map[string]*ecdsa.PrivateKey), symKeys: make(map[string][]byte), @@ -281,6 +277,7 @@ func New(nodeKey *ecdsa.PrivateKey, cfg *Config, logger *zap.Logger, appDB *sql. onPeerStats: onPeerStats, onlineChecker: onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker), sendQueue: publish.NewMessageQueue(1000, cfg.UseThrottledPublish), + protectedTopicStore: protectedTopicsPersistence, } waku.filters = common.NewFilters(waku.cfg.DefaultShardPubsubTopic, waku.logger) @@ -1072,13 +1069,6 @@ func (w *Waku) Stop() error { return err } - if w.protectedTopicStore != nil { - err := w.protectedTopicStore.Close() - if err != nil { - return err - } - } - close(w.goingOnline) w.wg.Wait() diff --git a/messaging/waku/persistence/protected_topics.go b/messaging/waku/persistence/protected_topics.go new file mode 100644 index 00000000000..bf641e0f594 --- /dev/null +++ b/messaging/waku/persistence/protected_topics.go @@ -0,0 +1,17 @@ +package persistence + +import ( + "crypto/ecdsa" +) + +type ProtectedTopics interface { + Insert(pubsubTopic string, privKey *ecdsa.PrivateKey, publicKey *ecdsa.PublicKey) error + Delete(pubsubTopic string) error + FetchPrivateKey(topic string) (*ecdsa.PrivateKey, error) + ProtectedTopics() ([]ProtectedTopic, error) +} + +type ProtectedTopic struct { + PubKey *ecdsa.PublicKey + Topic string +} diff --git a/messaging/waku/persistence/signed_messages.go b/messaging/waku/persistence/signed_messages.go deleted file mode 100644 index 8a1061105cc..00000000000 --- a/messaging/waku/persistence/signed_messages.go +++ /dev/null @@ -1,125 +0,0 @@ -package persistence - -import ( - "crypto/ecdsa" - "database/sql" - "errors" - - "go.uber.org/zap" - - "github.com/status-im/status-go/crypto" -) - -// DBStore is a MessageProvider that has a *sql.DB connection -type ProtectedTopicsStore struct { - db *sql.DB - log *zap.Logger - - insertStmt *sql.Stmt - fetchPrivKeyStmt *sql.Stmt - deleteStmt *sql.Stmt -} - -// Creates a new DB store using the db specified via options. -// It will create a messages table if it does not exist and -// clean up records according to the retention policy used -func NewProtectedTopicsStore(log *zap.Logger, db *sql.DB) (*ProtectedTopicsStore, error) { - insertStmt, err := db.Prepare("INSERT OR REPLACE INTO pubsubtopic_signing_key (topic, priv_key, pub_key) VALUES (?, ?, ?)") - if err != nil { - return nil, err - } - - fetchPrivKeyStmt, err := db.Prepare("SELECT priv_key FROM pubsubtopic_signing_key WHERE topic = ?") - if err != nil { - return nil, err - } - - deleteStmt, err := db.Prepare("DELETE FROM pubsubtopic_signing_key WHERE topic = ?") - if err != nil { - return nil, err - } - - result := new(ProtectedTopicsStore) - result.log = log.Named("protected-topics-store") - result.db = db - result.insertStmt = insertStmt - result.fetchPrivKeyStmt = fetchPrivKeyStmt - result.deleteStmt = deleteStmt - - return result, nil -} - -func (p *ProtectedTopicsStore) Close() error { - err := p.insertStmt.Close() - if err != nil { - return err - } - - return p.fetchPrivKeyStmt.Close() -} - -func (p *ProtectedTopicsStore) Insert(pubsubTopic string, privKey *ecdsa.PrivateKey, publicKey *ecdsa.PublicKey) error { - var privKeyBytes []byte - if privKey != nil { - privKeyBytes = crypto.FromECDSA(privKey) - } - - pubKeyBytes := crypto.FromECDSAPub(publicKey) - - _, err := p.insertStmt.Exec(pubsubTopic, privKeyBytes, pubKeyBytes) - - return err -} - -func (p *ProtectedTopicsStore) Delete(pubsubTopic string) error { - _, err := p.deleteStmt.Exec(pubsubTopic) - return err -} - -func (p *ProtectedTopicsStore) FetchPrivateKey(topic string) (privKey *ecdsa.PrivateKey, err error) { - var privKeyBytes []byte - err = p.fetchPrivKeyStmt.QueryRow(topic).Scan(&privKeyBytes) - if err != nil { - if errors.Is(err, sql.ErrNoRows) { - return nil, nil - } - return nil, err - } - - return crypto.ToECDSA(privKeyBytes) -} - -type ProtectedTopic struct { - PubKey *ecdsa.PublicKey - Topic string -} - -func (p *ProtectedTopicsStore) ProtectedTopics() ([]ProtectedTopic, error) { - rows, err := p.db.Query("SELECT pub_key, topic FROM pubsubtopic_signing_key") - if err != nil { - return nil, err - } - defer rows.Close() - - var result []ProtectedTopic - for rows.Next() { - var pubKeyBytes []byte - var topic string - err := rows.Scan(&pubKeyBytes, &topic) - if err != nil { - return nil, err - } - - pubk, err := crypto.UnmarshalPubkey(pubKeyBytes) - if err != nil { - return nil, err - } - - result = append(result, ProtectedTopic{ - PubKey: pubk, - Topic: topic, - }) - } - - return result, nil -} diff --git a/protocol/messaging_persistence.go b/protocol/messaging_persistence.go index 03969d1b714..bf1169d5365 100644 --- a/protocol/messaging_persistence.go +++ b/protocol/messaging_persistence.go @@ -4,6 +4,7 @@ import ( "context" "crypto/ecdsa" "database/sql" + "errors" "fmt" "strings" "time" @@ -374,3 +375,62 @@ func (c *messagingPersistence) MarkAsConfirmed(dataSyncID []byte, atLeastOne boo return } + +func (c *messagingPersistence) WakuInsertProtectedTopic(pubsubTopic string, privKey *ecdsa.PrivateKey, publicKey *ecdsa.PublicKey) error { + var privKeyBytes []byte + if privKey != nil { + privKeyBytes = crypto.FromECDSA(privKey) + } + pubKeyBytes := crypto.FromECDSAPub(publicKey) + + _, err := c.db.Exec("INSERT OR REPLACE INTO pubsubtopic_signing_key (topic, priv_key, pub_key) VALUES (?, ?, ?)", + pubsubTopic, privKeyBytes, pubKeyBytes) + return err +} + +func (c *messagingPersistence) WakuDeleteProtectedTopic(pubsubTopic string) error { + _, err := c.db.Exec("DELETE FROM pubsubtopic_signing_key WHERE topic = ?", pubsubTopic) + return err +} + +func (c *messagingPersistence) WakuFetchPrivateKeyForProtectedTopic(topic string) (*ecdsa.PrivateKey, error) { + var privKeyBytes []byte + err := c.db.QueryRow("SELECT priv_key FROM pubsubtopic_signing_key WHERE topic = ?", topic).Scan(&privKeyBytes) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + return nil, err + } + return crypto.ToECDSA(privKeyBytes) +} + +func (c *messagingPersistence) WakuProtectedTopics() ([]types.ProtectedTopic, error) { + rows, err := c.db.Query("SELECT pub_key, topic FROM pubsubtopic_signing_key") + if err != nil { + return nil, err + } + defer rows.Close() + + var result []types.ProtectedTopic + for rows.Next() { + var pubKeyBytes []byte + var topic string + err := rows.Scan(&pubKeyBytes, &topic) + if err != nil { + return nil, err + } + + pubk, err := crypto.UnmarshalPubkey(pubKeyBytes) + if err != nil { + return nil, err + } + + result = append(result, types.ProtectedTopic{ + PubKey: pubk, + Topic: topic, + }) + } + + return result, nil +} diff --git a/protocol/messaging_persistence_test.go b/protocol/messaging_persistence_test.go index e2c1d016131..f81ba26c622 100644 --- a/protocol/messaging_persistence_test.go +++ b/protocol/messaging_persistence_test.go @@ -5,6 +5,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/status-im/status-go/crypto" "github.com/status-im/status-go/crypto/types" messagingtypes "github.com/status-im/status-go/messaging/types" ) @@ -206,3 +207,45 @@ func TestDeleteHashRatchetMessage(t *testing.T) { require.NotNil(t, fetchedMessages) require.Len(t, fetchedMessages, 1) } + +func TestWakuProtectedTopicPersistence(t *testing.T) { + db, err := openTestDB() + require.NoError(t, err) + p := NewMessagingPersistence(db) + + // Generate ECDSA keys + privKey, err := crypto.GenerateKey() + require.NoError(t, err) + pubKey := &privKey.PublicKey + + pubsubTopic := "test-topic" + + // Insert protected topic + err = p.WakuInsertProtectedTopic(pubsubTopic, privKey, pubKey) + require.NoError(t, err) + + // Fetch private key for topic + fetchedPrivKey, err := p.WakuFetchPrivateKeyForProtectedTopic(pubsubTopic) + require.NoError(t, err) + require.NotNil(t, fetchedPrivKey) + require.Equal(t, privKey.D.Bytes(), fetchedPrivKey.D.Bytes()) + + // Fetch protected topics + topics, err := p.WakuProtectedTopics() + require.NoError(t, err) + require.Len(t, topics, 1) + require.Equal(t, pubsubTopic, topics[0].Topic) + + // Delete protected topic + err = p.WakuDeleteProtectedTopic(pubsubTopic) + require.NoError(t, err) + + // Ensure topic is deleted + topics, err = p.WakuProtectedTopics() + require.NoError(t, err) + require.Len(t, topics, 0) + + fetchedPrivKey, err = p.WakuFetchPrivateKeyForProtectedTopic(pubsubTopic) + require.NoError(t, err) + require.Nil(t, fetchedPrivKey) +}