-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlistener_service.go
129 lines (113 loc) · 2.83 KB
/
listener_service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package discovery
import (
"net"
"context"
"github.com/iain17/discovery/pb"
"github.com/iain17/logger"
"errors"
"fmt"
)
type ListenerService struct {
localNode *LocalNode
context context.Context
listener net.PacketConn
socket net.Listener
logger *logger.Logger
}
func (d *ListenerService) String() string {
return "listener"
}
func (l *ListenerService) init(ctx context.Context) error {
defer func() {
if l.localNode.wg != nil {
l.localNode.wg.Done()
}
if l.localNode.coreWg != nil {
l.localNode.coreWg.Done()
l.localNode.coreWg = nil
}
}()
l.logger = logger.New(l.String())
l.context = ctx
//stunErr := l.localNode.StunService.Serve(ctx)
//if stunErr != nil {
// logger.Warningf("Stun error: %s", stunErr)
//}
var err error
l.socket, err = net.Listen("tcp", ":0")
if err != nil {
return fmt.Errorf("could not listen: %s", err.Error())
}
addr := l.socket.Addr().(*net.TCPAddr)
l.localNode.port = addr.Port
l.logger.Infof("listening on %d", l.localNode.port)
return err
}
func (l *ListenerService) Serve(ctx context.Context) {
defer l.Stop()
if err := l.init(ctx); err != nil {
l.localNode.lastError = err
panic(err)
}
go func () {
for {
select {
case <-l.context.Done():
l.logger.Info("Stopped listening.")
return
default:
l.logger.Debug("Listening...")
conn, err := l.socket.Accept()
if err != nil {
continue
}
key := conn.RemoteAddr().String()
if _, ok := l.localNode.netTableService.blackList.Get(key); ok {
conn.Close()
continue
}
go func(conn net.Conn) {
l.logger.Debugf("new connection from %s", conn.RemoteAddr().String())
if err = l.process(conn); err != nil {
conn.Close()
if err.Error() == "peer reset" || err.Error() == "we can't add ourselves" {
return
}
l.logger.Warningf("[%s] error on processing new connection, %s", conn.RemoteAddr().String(), err)
}
}(conn)
}
}
}()
select{}
}
func (l *ListenerService) Stop() {
l.logger.Info("Stopping...")
l.socket.Close()
}
//We receive a connection from a possible new peer.
func (l *ListenerService) process(c net.Conn) error {
rn := NewRemoteNode(c, l.localNode)
rn.logger.Debug("Sending our peer info...")
err := l.localNode.sendPeerInfo(c)
if err != nil {
return err
}
rn.logger.Debug("Sent our peer info...")
rn.logger.Debug("Waiting for peer info...")
peerInfo, err := pb.DecodePeerInfo(c, string(l.localNode.discovery.network.ExportPublicKey()))
if err != nil {
return err
}
if peerInfo.Id == l.localNode.id {
return errors.New("we can't add ourselves")
}
if l.localNode.netTableService.isConnected(peerInfo.Id) {
logger.Debugf("We are already connected to %s", peerInfo.Id)
return nil
}
rn.logger.Debug("Received peer info...")
rn.Initialize(peerInfo)
l.localNode.netTableService.AddRemoteNode(rn)
return nil
}