diff --git a/wakuv2/api.go b/wakuv2/api.go index f106b32f52b..db03bbd95a7 100644 --- a/wakuv2/api.go +++ b/wakuv2/api.go @@ -18,6 +18,7 @@ package wakuv2 +/* TODO-nwaku import ( "context" "crypto/ecdsa" @@ -513,4 +514,4 @@ func (api *PublicWakuAPI) NewMessageFilter(req Criteria) (string, error) { api.mu.Unlock() return id, nil -} +} */ \ No newline at end of file diff --git a/wakuv2/api_test.go b/wakuv2/api_test.go index 7a060bf5fd3..10d16a4c332 100644 --- a/wakuv2/api_test.go +++ b/wakuv2/api_test.go @@ -18,6 +18,7 @@ package wakuv2 +/* TODO-nwaku import ( "testing" "time" @@ -67,4 +68,4 @@ func TestMultipleTopicCopyInNewMessageFilter(t *testing.T) { if !found { t.Fatalf("Could not find filter with both topics") } -} +} */ \ No newline at end of file diff --git a/wakuv2/message_publishing.go b/wakuv2/message_publishing.go index 25f8f57d83f..4b9d0f667ec 100644 --- a/wakuv2/message_publishing.go +++ b/wakuv2/message_publishing.go @@ -1,5 +1,6 @@ package wakuv2 +/* TODO-nwaku import ( "errors" @@ -160,4 +161,4 @@ func (w *Waku) publishEnvelope(envelope *protocol.Envelope, publishFn publish.Pu }) } } -} +} */ \ No newline at end of file diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index 0fcb0b236fa..a1d70379817 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -4,8 +4,8 @@ package wakuv2 /* - #cgo LDFLAGS: -L../third_party/nwaku/vendor/negentropy/cpp/ -lnegentropy -L../third_party/nwaku/build/ -lwaku -lm -ldl -pthread -lminiupnpc -L../third_party/nwaku/vendor/nim-nat-traversal/vendor/miniupnp/miniupnpc/build/ -lnatpmp -L../third_party/nwaku/vendor/nim-nat-traversal/vendor/libnatpmp-upstream/ -L../third_party/nwaku/vendor/nim-libbacktrace/install/usr/lib/ -lbacktrace -Wl,--allow-multiple-definition - #cgo LDFLAGS: -Wl,-rpath,../third_party/nwaku/build/ + #cgo LDFLAGS: -L../third_party/nwaku/build/ -lnegentropy -lwaku + #cgo LDFLAGS: -L../third_party/nwaku -Wl,-rpath,../third_party/nwaku/build/ #include "../third_party/nwaku/library/libwaku.h" #include @@ -19,17 +19,17 @@ package wakuv2 size_t len; } Resp; - void* allocResp() { + static void* allocResp() { return calloc(1, sizeof(Resp)); } - void freeResp(void* resp) { + static void freeResp(void* resp) { if (resp != NULL) { free(resp); } } - char* getMyCharPtr(void* resp) { + static char* getMyCharPtr(void* resp) { if (resp == NULL) { return NULL; } @@ -37,7 +37,7 @@ package wakuv2 return m->msg; } - size_t getMyCharLen(void* resp) { + static size_t getMyCharLen(void* resp) { if (resp == NULL) { return 0; } @@ -45,7 +45,7 @@ package wakuv2 return m->len; } - int getRet(void* resp) { + static int getRet(void* resp) { if (resp == NULL) { return 0; } @@ -54,7 +54,7 @@ package wakuv2 } // resp must be set != NULL in case interest on retrieving data from the callback - void callback(int ret, char* msg, size_t len, void* resp) { + static void callback(int ret, char* msg, size_t len, void* resp) { if (resp != NULL) { Resp* m = (Resp*) resp; m->ret = ret; @@ -72,37 +72,37 @@ package wakuv2 } \ } while (0) - void* cGoWakuNew(const char* configJson, void* resp) { + static void* cGoWakuNew(const char* configJson, void* resp) { // We pass NULL because we are not interested in retrieving data from this callback void* ret = waku_new(configJson, (WakuCallBack) callback, resp); return ret; } - void cGoWakuStart(void* wakuCtx, void* resp) { + static void cGoWakuStart(void* wakuCtx, void* resp) { WAKU_CALL(waku_start(wakuCtx, (WakuCallBack) callback, resp)); } - void cGoWakuStop(void* wakuCtx, void* resp) { + static void cGoWakuStop(void* wakuCtx, void* resp) { WAKU_CALL(waku_stop(wakuCtx, (WakuCallBack) callback, resp)); } - void cGoWakuDestroy(void* wakuCtx, void* resp) { + static void cGoWakuDestroy(void* wakuCtx, void* resp) { WAKU_CALL(waku_destroy(wakuCtx, (WakuCallBack) callback, resp)); } - void cGoWakuStartDiscV5(void* wakuCtx, void* resp) { + static void cGoWakuStartDiscV5(void* wakuCtx, void* resp) { WAKU_CALL(waku_start_discv5(wakuCtx, (WakuCallBack) callback, resp)); } - void cGoWakuStopDiscV5(void* wakuCtx, void* resp) { + static void cGoWakuStopDiscV5(void* wakuCtx, void* resp) { WAKU_CALL(waku_stop_discv5(wakuCtx, (WakuCallBack) callback, resp)); } - void cGoWakuVersion(void* wakuCtx, void* resp) { + static void cGoWakuVersion(void* wakuCtx, void* resp) { WAKU_CALL(waku_version(wakuCtx, (WakuCallBack) callback, resp)); } - void cGoWakuSetEventCallback(void* wakuCtx) { + static void cGoWakuSetEventCallback(void* wakuCtx) { // The 'globalEventCallback' Go function is shared amongst all possible Waku instances. // Given that the 'globalEventCallback' is shared, we pass again the @@ -118,7 +118,7 @@ package wakuv2 waku_set_event_callback(wakuCtx, (WakuCallBack) globalEventCallback, wakuCtx); } - void cGoWakuContentTopic(void* wakuCtx, + static void cGoWakuContentTopic(void* wakuCtx, char* appName, int appVersion, char* contentTopicName, @@ -134,15 +134,15 @@ package wakuv2 resp) ); } - void cGoWakuPubsubTopic(void* wakuCtx, char* topicName, void* resp) { + static void cGoWakuPubsubTopic(void* wakuCtx, char* topicName, void* resp) { WAKU_CALL( waku_pubsub_topic(wakuCtx, topicName, (WakuCallBack) callback, resp) ); } - void cGoWakuDefaultPubsubTopic(void* wakuCtx, void* resp) { + static void cGoWakuDefaultPubsubTopic(void* wakuCtx, void* resp) { WAKU_CALL (waku_default_pubsub_topic(wakuCtx, (WakuCallBack) callback, resp)); } - void cGoWakuRelayPublish(void* wakuCtx, + static void cGoWakuRelayPublish(void* wakuCtx, const char* pubSubTopic, const char* jsonWakuMessage, int timeoutMs, @@ -156,14 +156,14 @@ package wakuv2 resp)); } - void cGoWakuRelaySubscribe(void* wakuCtx, char* pubSubTopic, void* resp) { + static void cGoWakuRelaySubscribe(void* wakuCtx, char* pubSubTopic, void* resp) { WAKU_CALL ( waku_relay_subscribe(wakuCtx, pubSubTopic, (WakuCallBack) callback, resp) ); } - void cGoWakuRelayUnsubscribe(void* wakuCtx, char* pubSubTopic, void* resp) { + static void cGoWakuRelayUnsubscribe(void* wakuCtx, char* pubSubTopic, void* resp) { WAKU_CALL ( waku_relay_unsubscribe(wakuCtx, pubSubTopic, @@ -171,7 +171,7 @@ package wakuv2 resp) ); } - void cGoWakuConnect(void* wakuCtx, char* peerMultiAddr, int timeoutMs, void* resp) { + static void cGoWakuConnect(void* wakuCtx, char* peerMultiAddr, int timeoutMs, void* resp) { WAKU_CALL( waku_connect(wakuCtx, peerMultiAddr, timeoutMs, @@ -179,23 +179,44 @@ package wakuv2 resp) ); } - void cGoWakuListenAddresses(void* wakuCtx, void* resp) { + static void cGoWakuDialPeerById(void* wakuCtx, + char* peerId, + char* protocol, + int timeoutMs, + void* resp) { + + WAKU_CALL( waku_dial_peer_by_id(wakuCtx, + peerId, + protocol, + timeoutMs, + (WakuCallBack) callback, + resp) ); + } + + static void cGoWakuDisconnectPeerById(void* wakuCtx, char* peerId, void* resp) { + WAKU_CALL( waku_disconnect_peer_by_id(wakuCtx, + peerId, + (WakuCallBack) callback, + resp) ); + } + + static void cGoWakuListenAddresses(void* wakuCtx, void* resp) { WAKU_CALL (waku_listen_addresses(wakuCtx, (WakuCallBack) callback, resp) ); } - void cGoWakuGetMyENR(void* ctx, void* resp) { + static void cGoWakuGetMyENR(void* ctx, void* resp) { WAKU_CALL (waku_get_my_enr(ctx, (WakuCallBack) callback, resp) ); } - void cGoWakuListPeersInMesh(void* ctx, char* pubSubTopic, void* resp) { + static void cGoWakuListPeersInMesh(void* ctx, char* pubSubTopic, void* resp) { WAKU_CALL (waku_relay_get_num_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) callback, resp) ); } - void cGoWakuGetNumConnectedPeers(void* ctx, char* pubSubTopic, void* resp) { + static void cGoWakuGetNumConnectedPeers(void* ctx, char* pubSubTopic, void* resp) { WAKU_CALL (waku_relay_get_num_connected_peers(ctx, pubSubTopic, (WakuCallBack) callback, resp) ); } - void cGoWakuLightpushPublish(void* wakuCtx, + static void cGoWakuLightpushPublish(void* wakuCtx, const char* pubSubTopic, const char* jsonWakuMessage, void* resp) { @@ -207,7 +228,7 @@ package wakuv2 resp)); } - void cGoWakuStoreQuery(void* wakuCtx, + static void cGoWakuStoreQuery(void* wakuCtx, const char* jsonQuery, const char* peerAddr, int timeoutMs, @@ -221,7 +242,7 @@ package wakuv2 resp)); } - void cGoWakuPeerExchangeQuery(void* wakuCtx, + static void cGoWakuPeerExchangeQuery(void* wakuCtx, uint64_t numPeers, void* resp) { @@ -231,7 +252,7 @@ package wakuv2 resp)); } - void cGoWakuGetPeerIdsByProtocol(void* wakuCtx, + static void cGoWakuGetPeerIdsByProtocol(void* wakuCtx, const char* protocol, void* resp) { @@ -253,13 +274,10 @@ import ( "encoding/json" "errors" "fmt" - "os" - "os/signal" "runtime" "strconv" "strings" "sync" - "syscall" "time" "unsafe" @@ -269,7 +287,6 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" - "github.com/ethereum/go-ethereum/rpc" "github.com/jellydator/ttlcache/v3" "github.com/libp2p/go-libp2p/core/metrics" "github.com/libp2p/go-libp2p/core/peer" @@ -294,6 +311,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/store" storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" "github.com/waku-org/go-waku/waku/v2/utils" @@ -314,6 +332,7 @@ const peersToPublishForLightpush = 2 const publishingLimiterRate = rate.Limit(2) const publishingLimitBurst = 4 +/* TODO-nwaku type SentEnvelope struct { Envelope *protocol.Envelope PublishMethod PublishMethod @@ -334,7 +353,7 @@ type ITelemetryClient interface { func (w *Waku) SetStatusTelemetryClient(client ITelemetryClient) { w.statusTelemetryClient = client -} +} */ func newTTLCache() *ttlcache.Cache[gethcommon.Hash, *common.ReceivedMessage] { cache := ttlcache.New[gethcommon.Hash, *common.ReceivedMessage](ttlcache.WithTTL[gethcommon.Hash, *common.ReceivedMessage](cacheTTL)) @@ -350,6 +369,7 @@ func (w *Waku) SubscribeToConnStatusChanges() *types.ConnStatusSubscription { return subscription } +/* TODO-nwaku func (w *Waku) getDiscV5BootstrapNodes(ctx context.Context, addresses []string) ([]*enode.Node, error) { wg := sync.WaitGroup{} mu := sync.Mutex{} @@ -475,7 +495,7 @@ func (w *Waku) discoverAndConnectPeers() { go w.connect(*peerInfo, nil, wps.Static) } } -} +} */ func (w *Waku) connect(peerInfo peer.AddrInfo, enr *enode.Node, origin wps.Origin) { // Connection will be prunned eventually by the connection manager if needed @@ -484,6 +504,7 @@ func (w *Waku) connect(peerInfo peer.AddrInfo, enr *enode.Node, origin wps.Origi w.WakuConnect(addr.String(), 1000) } +/* TODO-nwaku func (w *Waku) telemetryBandwidthStats(telemetryServerURL string) { w.wg.Add(1) defer w.wg.Done() @@ -596,7 +617,7 @@ func (w *Waku) APIs() []rpc.API { Public: false, }, } -} +} */ // Protocols returns the waku sub-protocols ran by this particular client. func (w *Waku) Protocols() []p2p.Protocol { @@ -849,6 +870,7 @@ func (w *Waku) GetSymKey(id string) ([]byte, error) { return nil, fmt.Errorf("non-existent key ID") } +/* TODO-nwaku // Subscribe installs a new message handler used for filtering, decrypting // and subsequent storing of incoming messages. func (w *Waku) Subscribe(f *common.Filter) (string, error) { @@ -906,7 +928,7 @@ func (w *Waku) ConfirmMessageDelivered(hashes []gethcommon.Hash) { return } w.messageSentCheck.DeleteByMessageIDs(hashes) -} +} */ func (w *Waku) SetStorePeerID(peerID peer.ID) { if w.messageSentCheck != nil { @@ -999,10 +1021,11 @@ func (w *Waku) Query(ctx context.Context, return nil, 0, nil } +/* TODO-nwaku // OnNewEnvelope is an interface from Waku FilterManager API that gets invoked when any new message is received by Filter. func (w *Waku) OnNewEnvelope(env *protocol.Envelope) error { return w.OnNewEnvelopes(env, common.RelayedMessageType, false) -} +} */ // Start implements node.Service, starting the background data propagation thread // of the Waku protocol. @@ -1024,10 +1047,6 @@ func (w *Waku) Start() error { return err } - ch := make(chan os.Signal, 1) - signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) - <-ch - // if err = w.node.Start(w.ctx); err != nil { // return fmt.Errorf("failed to start go-waku node: %v", err) // } @@ -1194,6 +1213,7 @@ func (w *Waku) MessageExists(mh pb.MessageHash) (bool, error) { return w.envelopeCache.Has(gethcommon.Hash(mh)), nil } +/* TODO-nwaku func (w *Waku) SetTopicsToVerifyForMissingMessages(peerID peer.ID, pubsubTopic string, contentTopics []string) { if !w.cfg.EnableMissingMessageVerification { return @@ -1234,8 +1254,9 @@ func (w *Waku) setupRelaySubscriptions() error { } return nil -} +} */ +/* TODO-nwaku func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.MessageType, processImmediately bool) error { if envelope == nil { return nil @@ -1318,13 +1339,14 @@ func (w *Waku) add(recvMessage *common.ReceivedMessage, processImmediately bool) } return true, nil -} +} */ // postEvent queues the message for further processing. func (w *Waku) postEvent(envelope *common.ReceivedMessage) { w.msgQueue <- envelope } +/* TODO-nwaku // processQueueLoop delivers the messages to the watchers during the lifetime of the waku node. func (w *Waku) processQueueLoop() { if w.ctx == nil { @@ -1379,7 +1401,7 @@ func (w *Waku) processMessage(e *common.ReceivedMessage) { Hash: e.Hash(), Event: common.EventEnvelopeAvailable, }) -} +} */ // GetEnvelope retrieves an envelope from the message queue by its hash. // It returns nil if the envelope can not be found. @@ -1421,6 +1443,7 @@ func (w *Waku) Peers() types.PeerStats { // return FormatPeerStats(w.node) } +/* TODO-nwaku func (w *Waku) RelayPeersByTopic(topic string) (*types.PeerList, error) { if w.cfg.LightClient { return nil, errors.New("only available in relay mode") @@ -1483,7 +1506,7 @@ func (w *Waku) RemovePubsubTopicKey(topic string) error { } return w.protectedTopicStore.Delete(topic) -} +} */ func (w *Waku) handleNetworkChangeFromApp(state connection.State) { //If connection state is reported by something other than peerCount becoming 0 e.g from mobile app, disconnect all peers @@ -1497,6 +1520,7 @@ func (w *Waku) handleNetworkChangeFromApp(state connection.State) { // } } +/* TODO-nwaku func (w *Waku) ConnectionChanged(state connection.State) { isOnline := !state.Offline if w.cfg.LightClient { @@ -1519,7 +1543,7 @@ func (w *Waku) ConnectionChanged(state connection.State) { w.onlineChecker.SetOnline(isOnline) } w.state = state -} +} */ func (w *Waku) AddStorePeer(address multiaddr.Multiaddr) (peer.ID, error) { // peerID, err := w.node.AddPeer(address, wps.Static, w.cfg.DefaultShardedPubsubTopics, store.StoreQueryID_v300) @@ -1550,11 +1574,8 @@ func (w *Waku) DialPeer(address multiaddr.Multiaddr) error { return nil } -func (w *Waku) DialPeerByID(peerID peer.ID) error { - // ctx, cancel := context.WithTimeout(w.ctx, requestTimeout) - // defer cancel() - // return w.node.DialPeerByID(ctx, peerID) - return nil +func (w *Waku) DialPeerByID(peerId peer.ID) error { + return w.WakuDialPeerById(peerId, string(relay.WakuRelayID_v200), 1000) } func (w *Waku) DropPeer(peerID peer.ID) error { @@ -1667,15 +1688,21 @@ type WakuPubsubTopic = string type WakuContentTopic = string type WakuConfig struct { - Host string `json:"host,omitempty"` - Port int `json:"port,omitempty"` - NodeKey string `json:"key,omitempty"` - EnableRelay bool `json:"relay"` - LogLevel string `json:"logLevel"` + Host string `json:"host,omitempty"` + Port int `json:"port,omitempty"` + NodeKey string `json:"key,omitempty"` + EnableRelay bool `json:"relay"` + LogLevel string `json:"logLevel"` + DnsDiscovery bool `json:"dnsDiscovery,omitempty"` + DnsDiscoveryUrl string `json:"dnsDiscoveryUrl,omitempty"` + MaxMessageSize string `json:"maxMessageSize,omitempty"` + Staticnodes []string `json:"staticnodes,omitempty"` + Discv5BootstrapNodes []string `json:"discv5BootstrapNodes,omitempty"` + Discv5Discovery bool `json:"discv5Discovery,omitempty"` + ClusterID uint16 `json:"clusterId,omitempty"` + Shards []uint16 `json:"shards,omitempty"` } -var jamon unsafe.Pointer - type Waku struct { wakuCtx unsafe.Pointer @@ -1710,7 +1737,7 @@ type Waku struct { cancel context.CancelFunc wg sync.WaitGroup - cfg *Config + cfg *WakuConfig options []node.WakuNodeOption envelopeFeed event.Feed @@ -1745,7 +1772,7 @@ type Waku struct { onHistoricMessagesRequestFailed func([]byte, peer.ID, error) onPeerStats func(types.ConnStatus) - statusTelemetryClient ITelemetryClient + // statusTelemetryClient ITelemetryClient // TODO-nwaku defaultShardInfo protocol.RelayShards } @@ -1769,20 +1796,12 @@ func printStackTrace() { func wakuNew(nodeKey *ecdsa.PrivateKey, fleet string, - cfg *Config, + cfg *WakuConfig, logger *zap.Logger, appDB *sql.DB, ts *timesource.NTPTimeSource, onHistoricMessagesRequestFailed func([]byte, peer.ID, error), onPeerStats func(types.ConnStatus)) (*Waku, error) { - nwakuConfig := WakuConfig{ - Host: cfg.Host, - Port: 30303, - NodeKey: "11d0dcea28e86f81937a3bd1163473c7fbc0a0db54fd72914849bc47bdf78710", - EnableRelay: true, - LogLevel: "DEBUG", - } - var err error if logger == nil { logger, err = zap.NewDevelopment() @@ -1794,14 +1813,15 @@ func wakuNew(nodeKey *ecdsa.PrivateKey, ts = timesource.Default() } + /* TODO-nwaku cfg = setDefaults(cfg) if err = cfg.Validate(logger); err != nil { return nil, err - } + } */ ctx, cancel := context.WithCancel(context.Background()) - jsonConfig, err := json.Marshal(nwakuConfig) + jsonConfig, err := json.Marshal(cfg) if err != nil { return nil, err } @@ -1813,7 +1833,6 @@ func wakuNew(nodeKey *ecdsa.PrivateKey, defer C.freeResp(resp) wakuCtx := C.cGoWakuNew(cJsonConfig, resp) - jamon = wakuCtx // Notice that the events for self node are handled by the 'MyEventCallback' method if C.getRet(resp) == C.RET_OK { @@ -1837,11 +1856,11 @@ func wakuNew(nodeKey *ecdsa.PrivateKey, timesource: ts, storeMsgIDsMu: sync.RWMutex{}, logger: logger, - discV5BootstrapNodes: cfg.DiscV5BootstrapNodes, + discV5BootstrapNodes: cfg.Discv5BootstrapNodes, onHistoricMessagesRequestFailed: onHistoricMessagesRequestFailed, onPeerStats: onPeerStats, onlineChecker: onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker), - sendQueue: publish.NewMessageQueue(1000, cfg.UseThrottledPublish), + //sendQueue: publish.NewMessageQueue(1000, cfg.UseThrottledPublish), // TODO-nwaku }, nil } @@ -2155,6 +2174,24 @@ func (self *Waku) WakuConnect(peerMultiAddr string, timeoutMs int) error { return errors.New(errMsg) } +func (self *Waku) WakuDialPeerById(peerId peer.ID, protocol string, timeoutMs int) error { + var resp = C.allocResp() + var cPeerId = C.CString(peerId.String()) + var cProtocol = C.CString(protocol) + defer C.freeResp(resp) + defer C.free(unsafe.Pointer(cPeerId)) + defer C.free(unsafe.Pointer(cProtocol)) + + C.cGoWakuDialPeerById(self.wakuCtx, cPeerId, cProtocol, C.int(timeoutMs), resp) + + if C.getRet(resp) == C.RET_OK { + return nil + } + errMsg := "error DialPeerById: " + + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return errors.New(errMsg) +} + func (self *Waku) ListenAddresses() ([]multiaddr.Multiaddr, error) { var resp = C.allocResp() defer C.freeResp(resp) @@ -2245,12 +2282,12 @@ func (self *Waku) GetNumConnectedPeers(paramPubsubTopic ...string) (int, error) numPeers, err := strconv.Atoi(numPeersStr) if err != nil { fmt.Println(":", err) - errMsg := "ListPeersInMesh - error converting string to int: " + err.Error() + errMsg := "GetNumConnectedPeers - error converting string to int: " + err.Error() return 0, errors.New(errMsg) } return numPeers, nil } - errMsg := "error ListPeersInMesh: " + + errMsg := "error GetNumConnectedPeers: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) return 0, errors.New(errMsg) } @@ -2265,6 +2302,9 @@ func (self *Waku) GetPeerIdsByProtocol(protocol string) (peer.IDSlice, error) { if C.getRet(resp) == C.RET_OK { peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + if peersStr == "" { + return peer.IDSlice{}, nil + } // peersStr contains a comma-separated list of peer ids itemsPeerIds := strings.Split(peersStr, ",") @@ -2272,7 +2312,7 @@ func (self *Waku) GetPeerIdsByProtocol(protocol string) (peer.IDSlice, error) { for _, peer := range itemsPeerIds { id, err := peermod.Decode(peer) if err != nil { - errMsg := "ListPeersInMesh - error converting string to int: " + err.Error() + errMsg := "GetPeerIdsByProtocol - error converting string to int: " + err.Error() return nil, errors.New(errMsg) } peers = append(peers, id) @@ -2285,6 +2325,22 @@ func (self *Waku) GetPeerIdsByProtocol(protocol string) (peer.IDSlice, error) { return nil, errors.New(errMsg) } +func (self *Waku) DisconnectPeerById(peerId peer.ID) error { + var resp = C.allocResp() + var cPeerId = C.CString(peerId.String()) + defer C.freeResp(resp) + defer C.free(unsafe.Pointer(cPeerId)) + + C.cGoWakuDisconnectPeerById(self.wakuCtx, cPeerId, resp) + + if C.getRet(resp) == C.RET_OK { + return nil + } + errMsg := "error DisconnectPeerById: " + + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return errors.New(errMsg) +} + // func main() { // config := WakuConfig{ @@ -2379,14 +2435,15 @@ func (self *Waku) GetPeerIdsByProtocol(protocol string) (peer.IDSlice, error) { // } // MaxMessageSize returns the maximum accepted message size. +/* TODO-nwaku func (w *Waku) MaxMessageSize() uint32 { return w.cfg.MaxMessageSize -} +} */ // New creates a WakuV2 client ready to communicate through the LibP2P network. func New(nodeKey *ecdsa.PrivateKey, fleet string, - cfg *Config, + cfg *WakuConfig, logger *zap.Logger, appDB *sql.DB, ts *timesource.NTPTimeSource, diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index 2c1490cb245..8f2ead7bb0a 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -5,14 +5,14 @@ package wakuv2 import ( "context" - "crypto/rand" "errors" - "math/big" - "os" + "slices" "testing" "time" "github.com/cenkalti/backoff/v3" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/crypto" @@ -20,16 +20,8 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" "github.com/stretchr/testify/require" - "golang.org/x/exp/maps" - "google.golang.org/protobuf/proto" - - "github.com/waku-org/go-waku/waku/v2/dnsdisc" - "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/protocol/pb" - "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/status-im/status-go/protocol/tt" - "github.com/status-im/status-go/wakuv2/common" ) var testStoreENRBootstrap = "enrtree://AI4W5N5IFEUIHF5LESUAOSMV6TKWF2MB6GU2YK7PU4TYUGUNOCEPW@store.staging.status.nodes.status.im" @@ -161,31 +153,30 @@ func parseNodes(rec []string) []*enode.Node { // Using Docker: // // IP_ADDRESS=$(hostname -I | awk '{print $1}'); -// docker run \ -// -p 60000:60000/tcp -p 9000:9000/udp -p 8645:8645/tcp harbor.status.im/wakuorg/nwaku:v0.31.0 \ -// --tcp-port=60000 --discv5-discovery=true --cluster-id=16 --pubsub-topic=/waku/2/rs/16/32 --pubsub-topic=/waku/2/rs/16/64 \ -// --nat=extip:${IP_ADDRESS} --discv5-discovery --discv5-udp-port=9000 --rest-address=0.0.0.0 --store +// docker run \ +// -p 61000:61000/tcp -p 8000:8000/udp -p 8646:8646/tcp harbor.status.im/wakuorg/nwaku:v0.33.0 \ +// --discv5-discovery=true --cluster-id=16 --log-level=DEBUG \ +// --nat=extip:${IP_ADDRESS} --discv5-udp-port=8000 --rest-address=0.0.0.0 --store --rest-port=8646 \ func TestBasicWakuV2(t *testing.T) { - nwakuInfo, err := GetNwakuInfo(nil, nil) - require.NoError(t, err) - - // Creating a fake DNS Discovery ENRTree - tree, url := makeTestTree("n", parseNodes([]string{nwakuInfo.EnrUri}), nil) - enrTreeAddress := url - envEnrTreeAddress := os.Getenv("ENRTREE_ADDRESS") - if envEnrTreeAddress != "" { - enrTreeAddress = envEnrTreeAddress + extNodeRestPort := 8646 + storeNodeInfo, err := GetNwakuInfo(nil, &extNodeRestPort) + require.NoError(t, err) + + nwakuConfig := WakuConfig{ + Port: 30303, + NodeKey: "11d0dcea28e86f81937a3bd1163473c7fbc0a0db54fd72914849bc47bdf78710", + EnableRelay: true, + LogLevel: "DEBUG", + DnsDiscoveryUrl: "enrtree://AMOJVZX4V6EXP7NTJPMAYJYST2QP6AJXYW76IU6VGJS7UVSNDYZG4@boot.prod.status.nodes.status.im", + DnsDiscovery: true, + Discv5Discovery: true, + Staticnodes: []string{storeNodeInfo.ListenAddresses[0]}, + ClusterID: 16, + Shards: []uint16{64}, } - - config := &Config{} - setDefaultConfig(config, false) - config.Port = 0 - config.Resolver = mapResolver(tree.ToTXT("n")) - config.DiscV5BootstrapNodes = []string{enrTreeAddress} - config.DiscoveryLimit = 20 - config.WakuNodes = []string{enrTreeAddress} - w, err := New(nil, "", config, nil, nil, nil, nil, nil) + + w, err := New(nil, "", &nwakuConfig, nil, nil, nil, nil, nil) require.NoError(t, err) require.NoError(t, w.Start()) @@ -193,48 +184,55 @@ func TestBasicWakuV2(t *testing.T) { require.NoError(t, err) require.NotNil(t, enr) - // DNSDiscovery - ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second) - defer cancel() - - discoveredNodes, err := dnsdisc.RetrieveNodes(ctx, enrTreeAddress, dnsdisc.WithResolver(config.Resolver)) - require.NoError(t, err) - - // Peer used for retrieving history - r, err := rand.Int(rand.Reader, big.NewInt(int64(len(discoveredNodes)))) - require.NoError(t, err) - - storeNode := discoveredNodes[int(r.Int64())] - options := func(b *backoff.ExponentialBackOff) { b.MaxElapsedTime = 30 * time.Second } // Sanity check, not great, but it's probably helpful err = tt.RetryWithBackOff(func() error { - if len(w.Peers()) < 1 { - return errors.New("no peers discovered") + + numConnected, err := w.GetNumConnectedPeers() + if err != nil { + return err } - return nil + // Have to be connected to at least 3 nodes: the static node, the bootstrap node, and one discovered node + if numConnected > 2 { + return nil + } + return errors.New("no peers discovered") }, options) require.NoError(t, err) - // Dropping Peer - err = w.DropPeer(storeNode.PeerID) + // Get local store node address + storeNode, err :=peer.AddrInfoFromString(storeNodeInfo.ListenAddresses[0]) + require.NoError(t, err) + require.NoError(t, err) + + // Check that we are indeed connected to the store node + connectedStoreNodes, err := w.GetPeerIdsByProtocol(string(store.StoreQueryID_v300)) require.NoError(t, err) + require.True(t, slices.Contains(connectedStoreNodes, storeNode.ID), "nwaku should be connected to the store node") - // Dialing with peerID - err = w.DialPeerByID(storeNode.PeerID) + // Disconnect from the store node + err = w.DisconnectPeerById(storeNode.ID) require.NoError(t, err) - err = tt.RetryWithBackOff(func() error { - if len(w.Peers()) < 1 { - return errors.New("no peers discovered") - } - return nil - }, options) + // Check that we are indeed disconnected + connectedStoreNodes, err = w.GetPeerIdsByProtocol(string(store.StoreQueryID_v300)) + require.NoError(t, err) + isDisconnected := !slices.Contains(connectedStoreNodes, storeNode.ID) + require.True(t, isDisconnected, "nwaku should be disconnected from the store node") + + // Re-connect + err = w.DialPeerByID(storeNode.ID) require.NoError(t, err) + // Check that we are connected again + connectedStoreNodes, err = w.GetPeerIdsByProtocol(string(store.StoreQueryID_v300)) + require.NoError(t, err) + require.True(t, slices.Contains(connectedStoreNodes, storeNode.ID), "nwaku should be connected to the store node") + + /* filter := &common.Filter{ PubsubTopic: config.DefaultShardPubsubTopic, Messages: common.NewMemoryMessageStore(), @@ -292,7 +290,7 @@ func TestBasicWakuV2(t *testing.T) { } return nil }, options) - require.NoError(t, err) + require.NoError(t, err) */ require.NoError(t, w.Stop()) }