Skip to content
This repository has been archived by the owner on Jan 9, 2024. It is now read-only.

Commit

Permalink
Fix a critical ID collision bug (issue #199)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jun Chen committed Sep 4, 2018
1 parent 16d8df5 commit 415548f
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 22 deletions.
10 changes: 5 additions & 5 deletions stream-multiplexer/egress.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

// EgressServer takes data from a go channel and recreates the multiplexed TCP streams
type EgressServer struct {
activeConnections map[string]*MultiplexedConnection
activeConnections map[uint32]*MultiplexedConnection
maxMessageSize int
maxPayloadSize int
upstreamChan chan []byte
Expand All @@ -29,7 +29,7 @@ func StartEgressHandler(serverAddress string, maxMessageSize int, upstreamChan c
eg.upstreamChan = upstreamChan
eg.downstreamChan = downstreamChan
eg.stopChan = stopChan
eg.activeConnections = make(map[string]*MultiplexedConnection)
eg.activeConnections = make(map[uint32]*MultiplexedConnection)
eg.verbose = verbose

if verbose {
Expand All @@ -51,7 +51,7 @@ func StartEgressHandler(serverAddress string, maxMessageSize int, upstreamChan c
continue
}

ID := string(dataRead[0:4])
ID := binary.BigEndian.Uint32(dataRead[0:4])
size := int(binary.BigEndian.Uint32(dataRead[4:8]))
data := dataRead[8:]

Expand All @@ -72,11 +72,11 @@ func StartEgressHandler(serverAddress string, maxMessageSize int, upstreamChan c
serverAddress, "? You need one!", err)
continue
} else {

mc := new(MultiplexedConnection)
mc.conn = c
mc.ID = ID
mc.ID_bytes = []byte(ID)
mc.ID_bytes = make([]byte, 4)
binary.BigEndian.PutUint32(mc.ID_bytes, ID)
mc.stopChan = make(chan bool, 1)
mc.maxMessageLength = eg.maxMessageSize

Expand Down
31 changes: 14 additions & 17 deletions stream-multiplexer/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"strconv"

"bytes"
"crypto/rand"
"encoding/binary"
"encoding/hex"
Expand All @@ -22,7 +21,7 @@ const MULTIPLEXER_HEADER_SIZE = 8
// MultiplexedConnection represents a TCP connections to which we assigned
// a stream ID
type MultiplexedConnection struct {
ID string
ID uint32
ID_bytes []byte
conn net.Conn
stopChan chan bool
Expand All @@ -33,7 +32,7 @@ type MultiplexedConnection struct {
// over go channels
type IngressServer struct {
activeConnectionsLock sync.Locker
activeConnections []*MultiplexedConnection
activeConnections map[uint32]*MultiplexedConnection
socketListener *net.TCPListener
maxMessageSize int
maxPayloadSize int
Expand All @@ -53,7 +52,7 @@ func StartIngressServer(port int, maxMessageSize int, upstreamChan chan []byte,
ig.stopChan = stopChan
ig.maxPayloadSize = maxMessageSize - MULTIPLEXER_HEADER_SIZE //we use 8 bytes for the multiplexing
ig.activeConnectionsLock = new(sync.Mutex)
ig.activeConnections = make([]*MultiplexedConnection, 0)
ig.activeConnections = make(map[uint32]*MultiplexedConnection)
ig.verbose = verbose
if verbose {
log.Lvl1("Ingress Server in verbose mode")
Expand Down Expand Up @@ -111,14 +110,14 @@ func StartIngressServer(port int, maxMessageSize int, upstreamChan chan []byte,
mc := new(MultiplexedConnection)
mc.conn = conn
mc.ID = id
ID_bytes := []byte(id)
mc.ID_bytes = ID_bytes[0:4]
mc.ID_bytes = make([]byte, 4)
binary.BigEndian.PutUint32(mc.ID_bytes, id)
mc.stopChan = make(chan bool, 1)
mc.maxMessageLength = ig.maxMessageSize

// lock the list before editing it
ig.activeConnectionsLock.Lock()
ig.activeConnections = append(ig.activeConnections, mc)
ig.activeConnections[id] = mc // TODO: IDs are generated by sampling 32 random bit, but they still may collide.
ig.activeConnectionsLock.Unlock()

// starts a handler that pours "mc.connection" into upstreamChan
Expand Down Expand Up @@ -150,15 +149,13 @@ func (ig *IngressServer) multiplexedChannelReader() {
data = data[0:length]
}

IDKey := binary.BigEndian.Uint32(ID)

ig.activeConnectionsLock.Lock()
for _, v := range ig.activeConnections {
if bytes.Equal(v.ID_bytes, ID) {
v.conn.Write(data)
break
}
if val, ok := ig.activeConnections[IDKey]; ok && (val != nil) && (val.conn != nil) {
val.conn.Write(data)
}
ig.activeConnectionsLock.Unlock()

}
}

Expand Down Expand Up @@ -188,7 +185,7 @@ func (ig *IngressServer) ingressConnectionReader(mc *MultiplexedConnection) {
return
}

log.Error("Ingress server: connectionReader error,", err)
log.Error("Ingress server: connectionReader error,", mc.ID, err)
return
}

Expand All @@ -207,9 +204,9 @@ func (ig *IngressServer) ingressConnectionReader(mc *MultiplexedConnection) {
}

//generateID generates an ID from a private key
func generateRandomID() string {
func generateRandomID() uint32 {
var n uint32
binary.Read(rand.Reader, binary.LittleEndian, &n)
binary.Read(rand.Reader, binary.BigEndian, &n)

return strconv.Itoa(int(n))
return n
}

0 comments on commit 415548f

Please sign in to comment.