diff --git a/stream-multiplexer/egress.go b/stream-multiplexer/egress.go index 4df115b93..7f9aa0c13 100644 --- a/stream-multiplexer/egress.go +++ b/stream-multiplexer/egress.go @@ -12,7 +12,7 @@ import ( // EgressServer takes data from a go channel and recreates the multiplexed TCP streams type EgressServer struct { - activeConnections map[string]*MultiplexedConnection + activeConnections map[uint32]*MultiplexedConnection maxMessageSize int maxPayloadSize int upstreamChan chan []byte @@ -29,7 +29,7 @@ func StartEgressHandler(serverAddress string, maxMessageSize int, upstreamChan c eg.upstreamChan = upstreamChan eg.downstreamChan = downstreamChan eg.stopChan = stopChan - eg.activeConnections = make(map[string]*MultiplexedConnection) + eg.activeConnections = make(map[uint32]*MultiplexedConnection) eg.verbose = verbose if verbose { @@ -51,7 +51,7 @@ func StartEgressHandler(serverAddress string, maxMessageSize int, upstreamChan c continue } - ID := string(dataRead[0:4]) + ID := binary.BigEndian.Uint32(dataRead[0:4]) size := int(binary.BigEndian.Uint32(dataRead[4:8])) data := dataRead[8:] @@ -72,11 +72,11 @@ func StartEgressHandler(serverAddress string, maxMessageSize int, upstreamChan c serverAddress, "? You need one!", err) continue } else { - mc := new(MultiplexedConnection) mc.conn = c mc.ID = ID - mc.ID_bytes = []byte(ID) + mc.ID_bytes = make([]byte, 4) + binary.BigEndian.PutUint32(mc.ID_bytes, ID) mc.stopChan = make(chan bool, 1) mc.maxMessageLength = eg.maxMessageSize diff --git a/stream-multiplexer/ingress.go b/stream-multiplexer/ingress.go index 488723151..0bc07bed7 100644 --- a/stream-multiplexer/ingress.go +++ b/stream-multiplexer/ingress.go @@ -5,7 +5,6 @@ import ( "strconv" - "bytes" "crypto/rand" "encoding/binary" "encoding/hex" @@ -22,7 +21,7 @@ const MULTIPLEXER_HEADER_SIZE = 8 // MultiplexedConnection represents a TCP connections to which we assigned // a stream ID type MultiplexedConnection struct { - ID string + ID uint32 ID_bytes []byte conn net.Conn stopChan chan bool @@ -33,7 +32,7 @@ type MultiplexedConnection struct { // over go channels type IngressServer struct { activeConnectionsLock sync.Locker - activeConnections []*MultiplexedConnection + activeConnections map[uint32]*MultiplexedConnection socketListener *net.TCPListener maxMessageSize int maxPayloadSize int @@ -53,7 +52,7 @@ func StartIngressServer(port int, maxMessageSize int, upstreamChan chan []byte, ig.stopChan = stopChan ig.maxPayloadSize = maxMessageSize - MULTIPLEXER_HEADER_SIZE //we use 8 bytes for the multiplexing ig.activeConnectionsLock = new(sync.Mutex) - ig.activeConnections = make([]*MultiplexedConnection, 0) + ig.activeConnections = make(map[uint32]*MultiplexedConnection) ig.verbose = verbose if verbose { log.Lvl1("Ingress Server in verbose mode") @@ -111,14 +110,14 @@ func StartIngressServer(port int, maxMessageSize int, upstreamChan chan []byte, mc := new(MultiplexedConnection) mc.conn = conn mc.ID = id - ID_bytes := []byte(id) - mc.ID_bytes = ID_bytes[0:4] + mc.ID_bytes = make([]byte, 4) + binary.BigEndian.PutUint32(mc.ID_bytes, id) mc.stopChan = make(chan bool, 1) mc.maxMessageLength = ig.maxMessageSize // lock the list before editing it ig.activeConnectionsLock.Lock() - ig.activeConnections = append(ig.activeConnections, mc) + ig.activeConnections[id] = mc // TODO: IDs are generated by sampling 32 random bit, but they still may collide. ig.activeConnectionsLock.Unlock() // starts a handler that pours "mc.connection" into upstreamChan @@ -150,15 +149,13 @@ func (ig *IngressServer) multiplexedChannelReader() { data = data[0:length] } + IDKey := binary.BigEndian.Uint32(ID) + ig.activeConnectionsLock.Lock() - for _, v := range ig.activeConnections { - if bytes.Equal(v.ID_bytes, ID) { - v.conn.Write(data) - break - } + if val, ok := ig.activeConnections[IDKey]; ok && (val != nil) && (val.conn != nil) { + val.conn.Write(data) } ig.activeConnectionsLock.Unlock() - } } @@ -188,7 +185,7 @@ func (ig *IngressServer) ingressConnectionReader(mc *MultiplexedConnection) { return } - log.Error("Ingress server: connectionReader error,", err) + log.Error("Ingress server: connectionReader error,", mc.ID, err) return } @@ -207,9 +204,9 @@ func (ig *IngressServer) ingressConnectionReader(mc *MultiplexedConnection) { } //generateID generates an ID from a private key -func generateRandomID() string { +func generateRandomID() uint32 { var n uint32 - binary.Read(rand.Reader, binary.LittleEndian, &n) + binary.Read(rand.Reader, binary.BigEndian, &n) - return strconv.Itoa(int(n)) + return n }