Skip to content

Commit 2144049

Browse files
committed
webrtc: correctly report incoming packet address on muxed connection
1 parent 44db2b4 commit 2144049

File tree

2 files changed

+52
-32
lines changed

2 files changed

+52
-32
lines changed

p2p/transport/webrtc/udpmux/mux.go

+32-15
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"io"
88
"net"
9+
"strings"
910
"sync"
1011

1112
logging "github.com/ipfs/go-log/v2"
@@ -42,9 +43,15 @@ type UDPMux struct {
4243

4344
queue chan Candidate
4445

45-
mx sync.Mutex
46+
mx sync.Mutex
47+
// ufragMap allows us to multiplex incoming STUN packets based on ufrag
4648
ufragMap map[ufragConnKey]*muxedConnection
47-
addrMap map[string]*muxedConnection
49+
// addrMap allows us to correctly direct incoming packets after the connection
50+
// is established and ufrag isn't available on all packets
51+
addrMap map[string]*muxedConnection
52+
// ufragAddrMap allows us to clean up all addresses from the addrMap once
53+
// connection is closed
54+
ufragAddrMap map[ufragConnKey][]net.Addr
4855

4956
// the context controls the lifecycle of the mux
5057
wg sync.WaitGroup
@@ -57,12 +64,13 @@ var _ ice.UDPMux = &UDPMux{}
5764
func NewUDPMux(socket net.PacketConn) *UDPMux {
5865
ctx, cancel := context.WithCancel(context.Background())
5966
mux := &UDPMux{
60-
ctx: ctx,
61-
cancel: cancel,
62-
socket: socket,
63-
ufragMap: make(map[ufragConnKey]*muxedConnection),
64-
addrMap: make(map[string]*muxedConnection),
65-
queue: make(chan Candidate, 32),
67+
ctx: ctx,
68+
cancel: cancel,
69+
socket: socket,
70+
ufragMap: make(map[ufragConnKey]*muxedConnection),
71+
addrMap: make(map[string]*muxedConnection),
72+
ufragAddrMap: make(map[ufragConnKey][]net.Addr),
73+
queue: make(chan Candidate, 32),
6674
}
6775

6876
return mux
@@ -130,7 +138,11 @@ func (mux *UDPMux) readLoop() {
130138

131139
n, addr, err := mux.socket.ReadFrom(buf)
132140
if err != nil {
133-
log.Errorf("error reading from socket: %v", err)
141+
if strings.Contains(err.Error(), "use of closed network connection") {
142+
log.Debugf("readLoop exiting: socket %s closed", mux.socket.LocalAddr())
143+
} else {
144+
log.Errorf("error reading from socket %s: %v", mux.socket.LocalAddr(), err)
145+
}
134146
pool.Put(buf)
135147
return
136148
}
@@ -157,7 +169,7 @@ func (mux *UDPMux) processPacket(buf []byte, addr net.Addr) (processed bool) {
157169
conn, ok := mux.addrMap[addr.String()]
158170
mux.mx.Unlock()
159171
if ok {
160-
if err := conn.Push(buf); err != nil {
172+
if err := conn.Push(buf, addr); err != nil {
161173
log.Debugf("could not push packet: %v", err)
162174
return false
163175
}
@@ -196,7 +208,7 @@ func (mux *UDPMux) processPacket(buf []byte, addr net.Addr) (processed bool) {
196208
}
197209
}
198210

199-
if err := conn.Push(buf); err != nil {
211+
if err := conn.Push(buf, addr); err != nil {
200212
log.Debugf("could not push packet: %v", err)
201213
return false
202214
}
@@ -250,9 +262,12 @@ func (mux *UDPMux) RemoveConnByUfrag(ufrag string) {
250262

251263
for _, isIPv6 := range [...]bool{true, false} {
252264
key := ufragConnKey{ufrag: ufrag, isIPv6: isIPv6}
253-
if conn, ok := mux.ufragMap[key]; ok {
265+
if _, ok := mux.ufragMap[key]; ok {
254266
delete(mux.ufragMap, key)
255-
delete(mux.addrMap, conn.RemoteAddr().String())
267+
for _, addr := range mux.ufragAddrMap[key] {
268+
delete(mux.addrMap, addr.String())
269+
}
270+
delete(mux.ufragAddrMap, key)
256271
}
257272
}
258273
}
@@ -264,12 +279,14 @@ func (mux *UDPMux) getOrCreateConn(ufrag string, isIPv6 bool, _ *UDPMux, addr ne
264279
defer mux.mx.Unlock()
265280

266281
if conn, ok := mux.ufragMap[key]; ok {
282+
mux.addrMap[addr.String()] = conn
283+
mux.ufragAddrMap[key] = append(mux.ufragAddrMap[key], addr)
267284
return false, conn
268285
}
269286

270-
conn := newMuxedConnection(mux, func() { mux.RemoveConnByUfrag(ufrag) }, addr)
287+
conn := newMuxedConnection(mux, func() { mux.RemoveConnByUfrag(ufrag) })
271288
mux.ufragMap[key] = conn
272289
mux.addrMap[addr.String()] = conn
273-
290+
mux.ufragAddrMap[key] = append(mux.ufragAddrMap[key], addr)
274291
return true, conn
275292
}

p2p/transport/webrtc/udpmux/muxed_connection.go

+20-17
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,11 @@ import (
99
pool "github.com/libp2p/go-buffer-pool"
1010
)
1111

12+
type packet struct {
13+
buf []byte
14+
addr net.Addr
15+
}
16+
1217
var _ net.PacketConn = &muxedConnection{}
1318

1419
const queueLen = 128
@@ -21,48 +26,46 @@ type muxedConnection struct {
2126
ctx context.Context
2227
cancel context.CancelFunc
2328
onClose func()
24-
queue chan []byte
25-
remote net.Addr
29+
queue chan packet
2630
mux *UDPMux
2731
}
2832

2933
var _ net.PacketConn = &muxedConnection{}
3034

31-
func newMuxedConnection(mux *UDPMux, onClose func(), remote net.Addr) *muxedConnection {
35+
func newMuxedConnection(mux *UDPMux, onClose func()) *muxedConnection {
3236
ctx, cancel := context.WithCancel(mux.ctx)
3337
return &muxedConnection{
3438
ctx: ctx,
3539
cancel: cancel,
36-
queue: make(chan []byte, queueLen),
40+
queue: make(chan packet, queueLen),
3741
onClose: onClose,
38-
remote: remote,
3942
mux: mux,
4043
}
4144
}
4245

43-
func (c *muxedConnection) Push(buf []byte) error {
46+
func (c *muxedConnection) Push(buf []byte, addr net.Addr) error {
4447
select {
4548
case <-c.ctx.Done():
4649
return errors.New("closed")
4750
default:
4851
}
4952
select {
50-
case c.queue <- buf:
53+
case c.queue <- packet{buf: buf, addr: addr}:
5154
return nil
5255
default:
5356
return errors.New("queue full")
5457
}
5558
}
5659

57-
func (c *muxedConnection) ReadFrom(p []byte) (int, net.Addr, error) {
60+
func (c *muxedConnection) ReadFrom(buf []byte) (int, net.Addr, error) {
5861
select {
59-
case buf := <-c.queue:
60-
n := copy(p, buf) // This might discard parts of the packet, if p is too short
61-
if n < len(buf) {
62-
log.Debugf("short read, had %d, read %d", len(buf), n)
62+
case p := <-c.queue:
63+
n := copy(buf, p.buf) // This might discard parts of the packet, if p is too short
64+
if n < len(p.buf) {
65+
log.Debugf("short read, had %d, read %d", len(p.buf), n)
6366
}
64-
pool.Put(buf)
65-
return n, c.remote, nil
67+
pool.Put(p.buf)
68+
return n, p.addr, nil
6669
case <-c.ctx.Done():
6770
return 0, nil, c.ctx.Err()
6871
}
@@ -83,15 +86,15 @@ func (c *muxedConnection) Close() error {
8386
// drain the packet queue
8487
for {
8588
select {
86-
case <-c.queue:
89+
case p := <-c.queue:
90+
pool.Put(p.buf)
8791
default:
8892
return nil
8993
}
9094
}
9195
}
9296

93-
func (c *muxedConnection) LocalAddr() net.Addr { return c.mux.socket.LocalAddr() }
94-
func (c *muxedConnection) RemoteAddr() net.Addr { return c.remote }
97+
func (c *muxedConnection) LocalAddr() net.Addr { return c.mux.socket.LocalAddr() }
9598

9699
func (*muxedConnection) SetDeadline(t time.Time) error {
97100
// no deadline is desired here

0 commit comments

Comments
 (0)