Skip to content
This repository has been archived by the owner on May 27, 2021. It is now read-only.

Commit

Permalink
allocator: use ip-only permissions and port-bindings (#35)
Browse files Browse the repository at this point in the history
allocator: use ip-only permissions and port-bindings
  • Loading branch information
ernado authored May 17, 2019
2 parents caea8c7 + 161063d commit 7d3ba13
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 63 deletions.
36 changes: 28 additions & 8 deletions internal/allocator/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,48 @@ type PeerHandler interface {
HandlePeerData(d []byte, t turn.FiveTuple, a turn.Addr)
}

// Binding wraps channel binding port, channel number and timeout.
//
// The full transport address is permission ip + binding port.
type Binding struct {
Port int
Channel turn.ChannelNumber
Timeout time.Time
}

// Permission as described in "Permissions" section, mimics the
// address-restricted filtering mechanism of NAT's.
//
// Note that permission is per IP address, and bindings are per transport
// address. Permission should ignore port.
//
// See RFC 5766 Section 2.3
type Permission struct {
Addr turn.Addr
Timeout time.Time
Binding turn.ChannelNumber // 0 or valid channel number
IP net.IP
Timeout time.Time
Bindings []Binding
}

func (p Permission) String() string {
if p.Binding == 0 {
return fmt.Sprintf("%s [%s]", p.Addr, p.Timeout.Format(time.RFC3339))
if len(p.Bindings) == 0 {
return fmt.Sprintf("%s [%s]", p.IP, p.Timeout.Format(time.RFC3339))
}
return fmt.Sprintf("%s (0x%x) [%s]", p.Addr, int(p.Binding), p.Timeout.Format(time.RFC3339))
return fmt.Sprintf("%s (b:%d) [%s]", p.IP, len(p.Bindings), p.Timeout.Format(time.RFC3339))
}

func (p *Permission) conflicts(n turn.ChannelNumber, peer turn.Addr) bool {
if p.Addr.Equal(peer) && (p.Binding == n || p.Binding == 0) {
if p.IP.Equal(peer.IP) && len(p.Bindings) == 0 {
return false
}
if !p.IP.Equal(peer.IP) {
return false
}
return !p.Addr.Equal(peer) || p.Binding == n
for _, b := range p.Bindings {
if b.Port == peer.Port {
return b.Channel != n
}
}
return false
}

// Allocation as described in "Allocations" section.
Expand Down
13 changes: 6 additions & 7 deletions internal/allocator/allocation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,16 @@ func TestFiveTuple_String(t *testing.T) {

func TestPermission_String(t *testing.T) {
p := Permission{
Addr: turn.Addr{
Port: 100,
IP: net.IPv4(127, 0, 0, 1),
},
IP: net.IPv4(127, 0, 0, 1),
Timeout: time.Date(2017, 1, 1, 1, 1, 1, 1, time.UTC),
}
if p.String() != "127.0.0.1:100 [2017-01-01T01:01:01Z]" {
if p.String() != "127.0.0.1 [2017-01-01T01:01:01Z]" {
t.Error("unexpected stringer output")
}
p.Binding = 0x4001
if p.String() != "127.0.0.1:100 (0x4001) [2017-01-01T01:01:01Z]" {
p.Bindings = []Binding{
{Port: 100, Channel: 0x4001},
}
if p.String() != "127.0.0.1 (b:1) [2017-01-01T01:01:01Z]" {
t.Error("unexpected stringer output")
}
}
Expand Down
133 changes: 86 additions & 47 deletions internal/allocator/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package allocator

import (
"fmt"
"net"
"sync"
"time"
Expand Down Expand Up @@ -101,16 +102,21 @@ func (a *Allocator) SendBound(tuple turn.FiveTuple, n turn.ChannelNumber, data [
continue
}
for _, p := range a.allocs[i].Permissions {
if p.Binding != n {
if len(p.Bindings) == 0 {
continue
}
conn = a.allocs[i].Conn
// Copy p.Addr to turn.Addr.
addr = turn.Addr{
Port: p.Addr.Port,
IP: make(net.IP, len(p.Addr.IP)),
for _, b := range p.Bindings {
if b.Channel != n {
continue
}
conn = a.allocs[i].Conn
// Copy p.Addr to turn.Addr.
addr = turn.Addr{
Port: b.Port,
IP: make(net.IP, len(p.IP)),
}
copy(addr.IP, p.IP)
}
copy(addr.IP, p.Addr.IP)
}
}
a.allocsMux.RUnlock()
Expand Down Expand Up @@ -150,7 +156,7 @@ func (a *Allocator) Send(tuple turn.FiveTuple, peer turn.Addr, data []byte) (int
continue
}
for _, p := range a.allocs[i].Permissions {
if !peer.Equal(p.Addr) {
if !peer.IP.Equal(p.IP) {
continue
}
conn = a.allocs[i].Conn
Expand Down Expand Up @@ -210,6 +216,13 @@ func (a *Allocator) Prune(t time.Time) {
for i := range a.allocs {
var newPermissions []Permission
for _, p := range a.allocs[i].Permissions {
var newBindings []Binding
for _, b := range p.Bindings {
if b.Timeout.After(t) {
newBindings = append(newBindings, b)
}
}
p.Bindings = newBindings
if p.Timeout.After(t) {
newPermissions = append(newPermissions, p)
continue
Expand Down Expand Up @@ -310,8 +323,8 @@ func (a *Allocator) New(tuple turn.FiveTuple, timeout time.Time, callback PeerHa
func (a *Allocator) CreatePermission(tuple turn.FiveTuple, peer turn.Addr, timeout time.Time) error {
permission := Permission{
Timeout: timeout,
Addr: peer,
}
permission.IP = append(permission.IP, peer.IP...)
var (
found bool
updated bool
Expand All @@ -323,7 +336,7 @@ func (a *Allocator) CreatePermission(tuple turn.FiveTuple, peer turn.Addr, timeo
}
found = true
for k := range a.allocs[i].Permissions {
if !a.allocs[i].Permissions[k].Addr.Equal(peer) {
if !a.allocs[i].Permissions[k].IP.Equal(peer.IP) {
continue
}
// Updating.
Expand Down Expand Up @@ -354,63 +367,92 @@ func (a *Allocator) CreatePermission(tuple turn.FiveTuple, peer turn.Addr, timeo
// channel binding.
//
// Allocator implementation does not assume any default timeout.
func (a *Allocator) ChannelBind(
tuple turn.FiveTuple, n turn.ChannelNumber, peer turn.Addr, timeout time.Time,
) error {
func (a *Allocator) ChannelBind(tuple turn.FiveTuple, n turn.ChannelNumber, peer turn.Addr, timeout time.Time) error {
if !n.Valid() {
return turn.ErrInvalidChannelNumber
}
updated := false
found := false
allocFound := false
a.allocsMux.Lock()
defer a.allocsMux.Unlock()
for i := range a.allocs {
if !a.allocs[i].Tuple.Equal(tuple) {
continue
}
// Searching for existing binding.
// Searching for existing permission.
for k := range a.allocs[i].Permissions {
var (
cN = a.allocs[i].Permissions[k].Binding
cAddr = a.allocs[i].Permissions[k].Addr
)
if (cN != n || cN == 0) && !cAddr.Equal(peer) {
// Skipping permission for different peer turn.Address if it is unbound
// or has different channel number.
pIP := a.allocs[i].Permissions[k].IP
if !pIP.Equal(peer.IP) {
continue
}
// Checking for binding conflicts.
if a.allocs[i].Permissions[k].conflicts(n, peer) {
// There is existing binding with same channel number or peer turn.Address.
fmt.Printf("Conflict %+v: %d %s",
a.allocs[i].Permissions[k],
n, peer,
)
return ErrAllocationMismatch
}
a.allocs[i].Permissions[k].Timeout = timeout
a.allocs[i].Permissions[k].Binding = n
a.log.Debug("updated binding",
zap.Stringer("addr", peer),
zap.Stringer("tuple", tuple),
zap.Stringer("binding", n),
)
updated = true
for j := range a.allocs[i].Permissions[k].Bindings {
if a.allocs[i].Permissions[k].Bindings[j].Channel != n {
continue
}
// Updating existing binding and permission.
a.allocs[i].Permissions[k].Bindings[j].Timeout = timeout
if timeout.After(a.allocs[i].Permissions[k].Timeout) {
a.allocs[i].Permissions[k].Timeout = timeout
}
a.log.Debug("updated binding",
zap.Stringer("addr", peer),
zap.Stringer("tuple", tuple),
zap.Stringer("binding", n),
)
updated = true
break
}
if !updated {
// No binding found, creating new one.
a.log.Debug("created binding",
zap.Stringer("addr", peer),
zap.Stringer("tuple", tuple),
zap.Stringer("binding", n),
)
if timeout.After(a.allocs[i].Permissions[k].Timeout) {
a.allocs[i].Permissions[k].Timeout = timeout
}
a.allocs[i].Permissions[k].Bindings = append(a.allocs[i].Permissions[k].Bindings, Binding{
Port: peer.Port,
Channel: n,
Timeout: timeout,
})
}
found = true
break
}
if !updated {
// No binding found, creating new one.
a.log.Debug("created binding",
if !found {
// No permission found, creating new one.
a.log.Debug("created permission via binding",
zap.Stringer("addr", peer),
zap.Stringer("tuple", tuple),
zap.Stringer("binding", n),
)
a.allocs[i].Permissions = append(a.allocs[i].Permissions, Permission{
Addr: peer,
Binding: n,
IP: peer.IP,
Timeout: timeout,
Bindings: []Binding{
{
Timeout: timeout,
Channel: n,
Port: peer.Port,
},
},
})
}
found = true
break
allocFound = true
}
if !found {
if !allocFound {
// No allocation found.
return ErrAllocationMismatch
}
Expand All @@ -426,14 +468,14 @@ func (a *Allocator) Bound(tuple turn.FiveTuple, peer turn.Addr) (turn.ChannelNum
continue
}
for k := range a.allocs[i].Permissions {
var (
cN = a.allocs[i].Permissions[k].Binding
cAddr = a.allocs[i].Permissions[k].Addr
)
if !cAddr.Equal(peer) || cN == 0 {
if !a.allocs[i].Permissions[k].IP.Equal(peer.IP) {
continue
}
return cN, nil
for j := range a.allocs[i].Permissions[k].Bindings {
if a.allocs[i].Permissions[k].Bindings[j].Port == peer.Port {
return a.allocs[i].Permissions[k].Bindings[j].Channel, nil
}
}
}
}
return 0, ErrAllocationMismatch
Expand Down Expand Up @@ -473,10 +515,7 @@ func (a *Allocator) Stats() Stats {
for i := range a.allocs {
s.Permissions += len(a.allocs[i].Permissions)
for k := range a.allocs[i].Permissions {
if a.allocs[i].Permissions[k].Binding == 0 {
continue
}
s.Bindings++
s.Bindings += len(a.allocs[i].Permissions[k].Bindings)
}
}
a.allocsMux.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion internal/allocator/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestAllocator_New(t *testing.T) {
}
peer2 := turn.Addr{
Port: 202,
IP: net.IPv4(127, 0, 0, 1),
IP: net.IPv4(127, 0, 0, 2),
}
timeout := now.Add(time.Second * 10)
tuple := turn.FiveTuple{
Expand Down

0 comments on commit 7d3ba13

Please sign in to comment.