Skip to content

Commit 0a21162

Browse files
Remove unused code (#2317)
1 parent 8bf9be9 commit 0a21162

File tree

5 files changed

+7
-1161
lines changed

5 files changed

+7
-1161
lines changed

p2p/p2p.go

+2-99
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"fmt"
88
"math/rand"
99
"strings"
10-
"sync"
1110
"time"
1211

1312
"github.com/Masterminds/semver/v3"
@@ -21,7 +20,6 @@ import (
2120
pubsub "github.com/libp2p/go-libp2p-pubsub"
2221
"github.com/libp2p/go-libp2p/core/crypto"
2322
"github.com/libp2p/go-libp2p/core/crypto/pb"
24-
"github.com/libp2p/go-libp2p/core/event"
2523
"github.com/libp2p/go-libp2p/core/host"
2624
"github.com/libp2p/go-libp2p/core/network"
2725
"github.com/libp2p/go-libp2p/core/peer"
@@ -43,10 +41,8 @@ type Service struct {
4341
handler *starknet.Handler
4442
log utils.SimpleLogger
4543

46-
dht *dht.IpfsDHT
47-
pubsub *pubsub.PubSub
48-
topics map[string]*pubsub.Topic
49-
topicsLock sync.RWMutex
44+
dht *dht.IpfsDHT
45+
pubsub *pubsub.PubSub
5046

5147
synchroniser *syncService
5248
gossipTracer *gossipTracer
@@ -157,7 +153,6 @@ func NewWithHost(p2phost host.Host, peers string, feederNode bool, bc *blockchai
157153
network: snNetwork,
158154
dht: p2pdht,
159155
feederNode: feederNode,
160-
topics: make(map[string]*pubsub.Topic),
161156
handler: starknet.NewHandler(bc, log),
162157
database: database,
163158
}
@@ -204,34 +199,6 @@ func privateKey(privKeyStr string) (crypto.PrivKey, error) {
204199
return prvKey, nil
205200
}
206201

207-
func (s *Service) SubscribePeerConnectednessChanged(ctx context.Context) (<-chan event.EvtPeerConnectednessChanged, error) {
208-
ch := make(chan event.EvtPeerConnectednessChanged)
209-
sub, err := s.host.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{})
210-
if err != nil {
211-
return nil, err
212-
}
213-
214-
go func() {
215-
for {
216-
select {
217-
case <-ctx.Done():
218-
if err = sub.Close(); err != nil {
219-
s.log.Warnw("Failed to close subscription", "err", err)
220-
}
221-
close(ch)
222-
return
223-
case evnt := <-sub.Out():
224-
typedEvnt := evnt.(event.EvtPeerConnectednessChanged)
225-
if typedEvnt.Connectedness == network.Connected {
226-
ch <- typedEvnt
227-
}
228-
}
229-
}
230-
}()
231-
232-
return ch, nil
233-
}
234-
235202
// Run starts the p2p service. Calling any other function before run is undefined behaviour
236203
func (s *Service) Run(ctx context.Context) error {
237204
defer func() {
@@ -336,70 +303,6 @@ func (s *Service) NewStream(ctx context.Context, pids ...protocol.ID) (network.S
336303
}
337304
}
338305

339-
func (s *Service) joinTopic(topic string) (*pubsub.Topic, error) {
340-
existingTopic := func() *pubsub.Topic {
341-
s.topicsLock.RLock()
342-
defer s.topicsLock.RUnlock()
343-
if t, found := s.topics[topic]; found {
344-
return t
345-
}
346-
return nil
347-
}()
348-
349-
if existingTopic != nil {
350-
return existingTopic, nil
351-
}
352-
353-
newTopic, err := s.pubsub.Join(topic)
354-
if err != nil {
355-
return nil, err
356-
}
357-
358-
s.topicsLock.Lock()
359-
defer s.topicsLock.Unlock()
360-
s.topics[topic] = newTopic
361-
return newTopic, nil
362-
}
363-
364-
func (s *Service) SubscribeToTopic(topic string) (chan []byte, func(), error) {
365-
t, joinErr := s.joinTopic(topic)
366-
if joinErr != nil {
367-
return nil, nil, joinErr
368-
}
369-
370-
sub, subErr := t.Subscribe()
371-
if subErr != nil {
372-
return nil, nil, subErr
373-
}
374-
375-
const bufferSize = 16
376-
ch := make(chan []byte, bufferSize)
377-
// go func() {
378-
// for {
379-
// msg, err := sub.Next(s.runCtx)
380-
// if err != nil {
381-
// close(ch)
382-
// return
383-
// }
384-
// only forward messages delivered by others
385-
// if msg.ReceivedFrom == s.host.ID() {
386-
// continue
387-
// }
388-
//
389-
// select {
390-
// case ch <- msg.GetData():
391-
// case <-s.runCtx.Done():
392-
// }
393-
// }
394-
// }()
395-
return ch, sub.Cancel, nil
396-
}
397-
398-
func (s *Service) PublishOnTopic(topic string) error {
399-
_, err := s.joinTopic(topic)
400-
return err
401-
}
402-
403306
func (s *Service) SetProtocolHandler(pid protocol.ID, handler func(network.Stream)) {
404307
s.host.SetStreamHandler(pid, handler)
405308
}

p2p/p2p_test.go

-142
Original file line numberDiff line numberDiff line change
@@ -1,141 +1,17 @@
11
package p2p_test
22

33
import (
4-
"context"
5-
"io"
6-
"strings"
7-
"sync"
84
"testing"
9-
"time"
105

116
"github.com/NethermindEth/juno/db"
127
"github.com/NethermindEth/juno/db/pebble"
138
"github.com/NethermindEth/juno/p2p"
149
"github.com/NethermindEth/juno/utils"
15-
"github.com/libp2p/go-libp2p/core/network"
1610
"github.com/libp2p/go-libp2p/core/peer"
17-
"github.com/libp2p/go-libp2p/core/protocol"
18-
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
1911
"github.com/multiformats/go-multiaddr"
2012
"github.com/stretchr/testify/require"
2113
)
2214

23-
func TestService(t *testing.T) {
24-
t.Skip("TestService")
25-
net, err := mocknet.FullMeshLinked(2)
26-
require.NoError(t, err)
27-
peerHosts := net.Hosts()
28-
require.Len(t, peerHosts, 2)
29-
30-
timeout := time.Second
31-
testCtx, cancel := context.WithCancel(context.Background())
32-
t.Cleanup(cancel)
33-
peerA, err := p2p.NewWithHost(
34-
peerHosts[0],
35-
"",
36-
false,
37-
nil,
38-
&utils.Integration,
39-
utils.NewNopZapLogger(),
40-
nil,
41-
)
42-
require.NoError(t, err)
43-
44-
events, err := peerA.SubscribePeerConnectednessChanged(testCtx)
45-
require.NoError(t, err)
46-
47-
peerAddrs, err := peerA.ListenAddrs()
48-
require.NoError(t, err)
49-
50-
peerAddrsString := make([]string, 0, len(peerAddrs))
51-
for _, addr := range peerAddrs {
52-
peerAddrsString = append(peerAddrsString, addr.String())
53-
}
54-
55-
peerB, err := p2p.NewWithHost(
56-
peerHosts[1],
57-
strings.Join(peerAddrsString, ","),
58-
true,
59-
nil,
60-
&utils.Integration,
61-
utils.NewNopZapLogger(),
62-
nil,
63-
)
64-
require.NoError(t, err)
65-
66-
wg := sync.WaitGroup{}
67-
wg.Add(2)
68-
go func() {
69-
defer wg.Done()
70-
require.NoError(t, peerA.Run(testCtx))
71-
}()
72-
go func() {
73-
defer wg.Done()
74-
require.NoError(t, peerB.Run(testCtx))
75-
}()
76-
77-
select {
78-
case evt := <-events:
79-
require.Equal(t, network.Connected, evt.Connectedness)
80-
case <-time.After(timeout):
81-
require.True(t, false, "no events were emitted")
82-
}
83-
84-
t.Run("gossip", func(t *testing.T) {
85-
t.Skip() // todo: flaky test
86-
topic := "coolTopic"
87-
ch, closer, err := peerA.SubscribeToTopic(topic)
88-
require.NoError(t, err)
89-
t.Cleanup(closer)
90-
91-
maxRetries := 4
92-
RetryLoop:
93-
for i := 0; i < maxRetries; i++ {
94-
gossipedMessage := []byte(`veryImportantMessage`)
95-
require.NoError(t, peerB.PublishOnTopic(topic))
96-
97-
select {
98-
case <-time.After(time.Second):
99-
if i == maxRetries-1 {
100-
require.Fail(t, "timeout: never received the message")
101-
}
102-
case msg := <-ch:
103-
require.Equal(t, gossipedMessage, msg)
104-
break RetryLoop
105-
}
106-
}
107-
})
108-
109-
t.Run("protocol handler", func(t *testing.T) {
110-
ch := make(chan []byte)
111-
112-
superSecretProtocol := protocol.ID("superSecretProtocol")
113-
peerA.SetProtocolHandler(superSecretProtocol, func(stream network.Stream) {
114-
read, err := io.ReadAll(stream)
115-
require.NoError(t, err)
116-
ch <- read
117-
})
118-
119-
peerAStream, err := peerB.NewStream(testCtx, superSecretProtocol)
120-
require.NoError(t, err)
121-
122-
superSecretMessage := []byte(`superSecretMessage`)
123-
_, err = peerAStream.Write(superSecretMessage)
124-
require.NoError(t, err)
125-
require.NoError(t, peerAStream.Close())
126-
127-
select {
128-
case <-time.After(timeout):
129-
require.Equal(t, true, false)
130-
case msg := <-ch:
131-
require.Equal(t, superSecretMessage, msg)
132-
}
133-
})
134-
135-
cancel()
136-
wg.Wait()
137-
}
138-
13915
func TestInvalidKey(t *testing.T) {
14016
_, err := p2p.New(
14117
"/ip4/127.0.0.1/tcp/30301",
@@ -153,24 +29,6 @@ func TestInvalidKey(t *testing.T) {
15329
require.Error(t, err)
15430
}
15531

156-
func TestValidKey(t *testing.T) {
157-
t.Skip("TestValidKey")
158-
_, err := p2p.New(
159-
"/ip4/127.0.0.1/tcp/30301",
160-
"",
161-
"peerA",
162-
"",
163-
"08011240333b4a433f16d7ca225c0e99d0d8c437b835cb74a98d9279c561977690c80f681b25ccf3fa45e2f2de260149c112fa516b69057dd3b0151a879416c0cb12d9b3",
164-
false,
165-
nil,
166-
&utils.Integration,
167-
utils.NewNopZapLogger(),
168-
nil,
169-
)
170-
171-
require.NoError(t, err)
172-
}
173-
17432
func TestLoadAndPersistPeers(t *testing.T) {
17533
testDB := pebble.NewMemTest(t)
17634

p2p/starknet/handlers.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -106,23 +106,23 @@ func streamHandler[ReqT proto.Message](ctx context.Context, wg *sync.WaitGroup,
106106
}
107107

108108
func (h *Handler) HeadersHandler(stream network.Stream) {
109-
streamHandler[*spec.BlockHeadersRequest](h.ctx, &h.wg, stream, h.onHeadersRequest, h.log)
109+
streamHandler(h.ctx, &h.wg, stream, h.onHeadersRequest, h.log)
110110
}
111111

112112
func (h *Handler) EventsHandler(stream network.Stream) {
113-
streamHandler[*spec.EventsRequest](h.ctx, &h.wg, stream, h.onEventsRequest, h.log)
113+
streamHandler(h.ctx, &h.wg, stream, h.onEventsRequest, h.log)
114114
}
115115

116116
func (h *Handler) TransactionsHandler(stream network.Stream) {
117-
streamHandler[*spec.TransactionsRequest](h.ctx, &h.wg, stream, h.onTransactionsRequest, h.log)
117+
streamHandler(h.ctx, &h.wg, stream, h.onTransactionsRequest, h.log)
118118
}
119119

120120
func (h *Handler) ClassesHandler(stream network.Stream) {
121-
streamHandler[*spec.ClassesRequest](h.ctx, &h.wg, stream, h.onClassesRequest, h.log)
121+
streamHandler(h.ctx, &h.wg, stream, h.onClassesRequest, h.log)
122122
}
123123

124124
func (h *Handler) StateDiffHandler(stream network.Stream) {
125-
streamHandler[*spec.StateDiffsRequest](h.ctx, &h.wg, stream, h.onStateDiffRequest, h.log)
125+
streamHandler(h.ctx, &h.wg, stream, h.onStateDiffRequest, h.log)
126126
}
127127

128128
func (h *Handler) onHeadersRequest(req *spec.BlockHeadersRequest) (iter.Seq[proto.Message], error) {

0 commit comments

Comments
 (0)