From 6b8644b5e7b5162dd374e7920b808da4f779c8dd Mon Sep 17 00:00:00 2001 From: Thomas Lin Date: Tue, 30 Jun 2020 10:23:04 -0400 Subject: [PATCH] Implemented support for TCP service chaining - Like the UDP version, the traffic is passed back along the path of VNFs in reverse order, but does not re-enter the VNFs. - Barring other improvements (erroneous error messages, stateful close, passing of errors back through the chain, etc.), this should commit should resolve the TCP/UDP case for #50. - As noted in #51, however, we currently pass payloads without headers, and proper tunneling that includes headers should be addressed within that issue. --- l4-proxy/chain-setup.go | 8 +- l4-proxy/l4-proxy.go | 9 +-- l4-proxy/tcp-proxy.go | 159 +++++++++++++++++++++++++++++++--------- 3 files changed, 131 insertions(+), 45 deletions(-) diff --git a/l4-proxy/chain-setup.go b/l4-proxy/chain-setup.go index 0cc3626..8d0f89e 100644 --- a/l4-proxy/chain-setup.go +++ b/l4-proxy/chain-setup.go @@ -502,7 +502,7 @@ func chainSetupHandler(stream network.Stream) { } else if foundMe == true && nextServ == "" { // This is the destination service // Return msg to previous proxy acknowledging setup - log.Printf("End of chain reached, sending SetupACK back\n") + log.Printf("End of chain reached, sending SetupACK back...\n") err = sendSetupACK(inSendRecv, REV_CHAIN_MSG_PREFIX + service) if err != nil { log.Printf("ERROR: Unable to send ACK to previous service\n%v\n", err) @@ -515,7 +515,7 @@ func chainSetupHandler(stream network.Stream) { udpEndChainHandler(stream) } else if tpProtoThis == "tcp" { stream.SetProtocol(tcpTunnelProtoID) - tcpTunnelHandler(stream) + tcpEndChainHandler(stream) } else { log.Printf("ERROR: Unknown transport protocol\n") } @@ -543,6 +543,7 @@ func chainSetupHandler(stream network.Stream) { outSendRecv := NewChainMsgCommunicator(outStream) // Forward chain setup request + log.Printf("Middle of chain reached, forwarding SetupRequest...\n") if err = sendSetupRequest(outSendRecv, chainSpec); err != nil { log.Printf("ERROR: sendSetupRequest() failed\n%v\n", err) return @@ -560,6 +561,7 @@ func chainSetupHandler(stream network.Stream) { if strings.HasPrefix(resMsg, REV_CHAIN_MSG_PREFIX) { resMsg += " " + service } + log.Printf("Chain SetupACK received, reverse forwarding SetupACK...\n") sendSetupACK(inSendRecv, resMsg) // Change protocol ID of input & output streams and invoke proper handler @@ -568,7 +570,7 @@ func chainSetupHandler(stream network.Stream) { udpMidChainHandler(stream, outStream) } else if tpProtoThis == "tcp" { outStream.SetProtocol(tcpTunnelProtoID) - tcpTunnelHandler(stream) + tcpMidChainHandler(stream, outStream) } else { log.Printf("ERROR: Unknown transport protocol\n") } diff --git a/l4-proxy/l4-proxy.go b/l4-proxy/l4-proxy.go index 21a1427..efc7bf4 100644 --- a/l4-proxy/l4-proxy.go +++ b/l4-proxy/l4-proxy.go @@ -62,11 +62,11 @@ var cache pcache.PeerCache type Forwarder struct { // Use TCPAddr instead for endpoint addresses? ListenAddr string - tcpWorker func(net.Listener, peer.ID) + tcpWorker func(net.Listener, []string) udpWorker func(*net.UDPConn, []string) } -// Maps a remote addr to existing ServiceProxy for that addr +// Maps a remote addr to existing Forwarder for that addr var serv2Fwd = make(map[string]Forwarder) func findOrAllocate(serviceHash, dockerHash string) (peer.ID, error) { @@ -234,10 +234,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) { // Start separate goroutine for handling connections and proxying to service var listenAddr string if tpProto == "tcp" { - //listenAddr, err = openTCPProxy(peerProxyID) - http.Error(w, "TCP chaining currently disabled", http.StatusMethodNotAllowed) - log.Printf("ERROR: TCP chaining currently disabled\n") - return + listenAddr, err = openTCPProxy(chainSpec) } else if tpProto == "udp" { listenAddr, err = openUDPProxy(chainSpec) } else { diff --git a/l4-proxy/tcp-proxy.go b/l4-proxy/tcp-proxy.go index 6c3c03e..0b86c6e 100644 --- a/l4-proxy/tcp-proxy.go +++ b/l4-proxy/tcp-proxy.go @@ -1,71 +1,141 @@ package main import ( + "errors" "fmt" + "io" "log" "net" + "strings" "syscall" "github.com/libp2p/go-libp2p-core/network" - "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" - - "github.com/t-lin/go-libp2p-gostream" ) var tcpTunnelProtoID = protocol.ID("/LCATunnelTCP/1.0") -// TCP data forwarder -// Simplified version of pipe() from https://github.com/jpillora/go-tcp-proxy/blob/master/proxy.go -func tcpFwdData(src, dst net.Conn) { +const MAX_TCP_TUNNEL_PAYLOAD = 0xffff - 20 - 20 // Max uint16 - Min TCP header size - IP header size + +// TODO: Only difference between this and udpFwdStream2Conn is +// the dstAddr param and WriteTo() in the body... can they +// be somehow merged? +func tcpFwdStream2Conn(src network.Stream, dst net.Conn) { + // Not shared amongst multiple clients, safe to close upstream connection + defer func() { + log.Printf("Closing connection %s <=> %s\n", dst.LocalAddr(), dst.RemoteAddr()) + src.Reset() + dst.Close() + }() + var err error var nBytes, nBytesW, n int - buf := make([]byte, 0xffff) // 64k buffer + var data []byte + + srcComm := NewChainMsgCommunicator(src) + for { + if data, err = receiveData(srcComm); err != nil { + log.Printf("ERROR: %v\n", err) + if errors.Is(err, syscall.EINVAL) || errors.Is(err, io.EOF) { + log.Printf("Connection %s <=> %s closed by remote", + src.Conn().LocalPeer(), src.Conn().RemotePeer()) + } else { + log.Printf("Unable to read from connection %s <=> %s\n%v\n", + src.Conn().LocalPeer(), src.Conn().RemotePeer(), err) + } + return + } + + nBytes = len(data) nBytesW = 0 + for nBytesW < nBytes { + n, err = dst.Write(data) + if err != nil { + if err == syscall.EINVAL { + log.Printf("Connection %s <=> %s closed", dst.LocalAddr(), dst.RemoteAddr()) + } else { + log.Printf("ERROR: Unable to write to TCP connection %s <=> %s\n%v\n", + dst.LocalAddr(), dst.RemoteAddr(), err) + } + return + } + + nBytesW += n + } + } +} + +// TODO: This now seems identical to udpFwdConn2Stream... except payload size. Merge the two? +func tcpFwdConn2Stream(src net.Conn, dst network.Stream) { + defer func() { + log.Printf("Closing connection %s <=> %s\n", + dst.Conn().LocalPeer(), dst.Conn().RemotePeer()) + dst.Reset() + src.Close() + }() + + var err error + var nBytes int + buf := make([]byte, MAX_TCP_TUNNEL_PAYLOAD) + + dstComm := NewChainMsgCommunicator(dst) + for { // NOTE: Using io.Copy or io.CopyBuffer slows down *a lot* after 2-3 runs. // Not clear why right now. Thus, do the copy manually. nBytes, err = src.Read(buf) if err != nil { - log.Printf("Connection %s <=> %s closed", src.LocalAddr(), src.RemoteAddr()) + if errors.Is(err, syscall.EINVAL) || errors.Is(err, io.EOF) { + log.Printf("Connection %s <=> %s closed by remote", src.LocalAddr(), src.RemoteAddr()) + } else { + log.Printf("Unable to read from TCP connection %s <=> %s\n%v\n", + src.LocalAddr(), src.RemoteAddr(), err) + } return } data := buf[:nBytes] - for nBytesW < nBytes { - n, err = dst.Write(data) - if err != nil { - log.Printf("Connection %s <=> %s closed", dst.LocalAddr(), dst.RemoteAddr()) - return + if err = sendData(dstComm, data); err != nil { + if errors.Is(err, syscall.EINVAL) { + log.Printf("Connection %s <=> %s closed", + dst.Conn().LocalPeer(), dst.Conn().RemotePeer()) + } else { + log.Printf("ERROR: Unable to write to connection %s <=> %s\n%v\n", + dst.Conn().LocalPeer(), dst.Conn().RemotePeer(), err) } - - nBytesW += n + return } } } // Connection forwarder -func tcpFwdConnToServ(lConn net.Conn, targetPeer peer.ID) { - defer lConn.Close() +// This function is separate from tcpServiceProxy() so as to avoid HOL +// blocking in the event the chain setup takes too long and another +// client wants to connect. +func tcpFwdConnToServ(lConn net.Conn, chainSpec []string) { log.Printf("Accepted TCP conn: %s <=> %s\n", lConn.LocalAddr(), lConn.RemoteAddr()) - p2pNode := manager.Host - rConn, err := gostream.Dial(p2pNode.Ctx, p2pNode.Host, targetPeer, tcpTunnelProtoID) + rConn, err := setupChain(chainSpec) if err != nil { - log.Printf("ERROR: Unable to dial target peer %s\n%v\n", targetPeer, err) + log.Printf("ERROR: Unable to set up chain %s\n%v\n", chainSpec, err) return } - defer rConn.Close() // Forward data in each direction - go tcpFwdData(lConn, rConn) - tcpFwdData(rConn, lConn) + // Closing will be done within the tcpFwd* functions + go tcpFwdConn2Stream(lConn, rConn) + go tcpFwdStream2Conn(rConn, lConn) } -// Implementation of ServiceProxy -func tcpServiceProxy(listen net.Listener, targetPeer peer.ID) { - defer listen.Close() +// Implementation of TCP service proxy. Invoked by the source proxy in the +// chain. Responsible for setting up the chain and passing the ingress stream +// to the appropriate data forwarders. +func tcpServiceProxy(listen net.Listener, chainSpec []string) { + defer func() { + log.Printf("Shutting down proxy for chain %s\n", chainSpec) + listen.Close() + }() for { conn, err := listen.Accept() @@ -79,7 +149,7 @@ func tcpServiceProxy(listen net.Listener, targetPeer peer.ID) { } // Forward connection - go tcpFwdConnToServ(conn, targetPeer) + go tcpFwdConnToServ(conn, chainSpec) } } @@ -90,9 +160,9 @@ func tcpServiceProxy(listen net.Listener, targetPeer peer.ID) { // TODO: In the future, perhaps the connection type info (TCP/UDP) can be stored in hash-lookup? // // Returns a string of the new TCP listening endpoint address -func openTCPProxy(servicePeer peer.ID) (string, error) { +func openTCPProxy(chainSpec []string) (string, error) { var listenAddr string - serviceKey := "tcp://" + string(servicePeer) + serviceKey := strings.Join(chainSpec, "/") if _, exists := serv2Fwd[serviceKey]; !exists { listen, err := net.Listen("tcp", ctrlHost + ":") // automatically choose port if err != nil { @@ -104,7 +174,7 @@ func openTCPProxy(servicePeer peer.ID) (string, error) { ListenAddr: listenAddr, tcpWorker: tcpServiceProxy, } - go serv2Fwd[serviceKey].tcpWorker(listen, servicePeer) + go serv2Fwd[serviceKey].tcpWorker(listen, chainSpec) } else { listenAddr = serv2Fwd[serviceKey].ListenAddr } @@ -114,10 +184,26 @@ func openTCPProxy(servicePeer peer.ID) (string, error) { // Handler for tcpTunnelProtoID (i.e. invoked at destination proxy) // Make a connection to the local service and forward data to/from it -func tcpTunnelHandler(stream network.Stream) { - lConn := gostream.NewConn(stream) - defer lConn.Close() +func tcpEndChainHandler(stream network.Stream) { + // Resolve and open connection to destination service + rAddr, err := net.ResolveTCPAddr("tcp", servEndpoint) + if err != nil { + log.Printf("ERROR: Unable to resolve target address %s\n%v\n", servEndpoint, err) + return + } + + rConn, err := net.DialTCP("tcp", nil, rAddr) + if err != nil { + log.Printf("ERROR: Unable to dial target address %s\n%v\n", servEndpoint, err) + return + } + + // Forward data in each direction + go tcpFwdStream2Conn(stream, rConn) + go tcpFwdConn2Stream(rConn, stream) +} +func tcpMidChainHandler(inStream, outStream network.Stream) { // Resolve and open connection to destination service rAddr, err := net.ResolveTCPAddr("tcp", servEndpoint) if err != nil { @@ -130,9 +216,10 @@ func tcpTunnelHandler(stream network.Stream) { log.Printf("ERROR: Unable to dial target address %s\n%v\n", servEndpoint, err) return } - defer rConn.Close() // Forward data in each direction - go tcpFwdData(lConn, rConn) - tcpFwdData(rConn, lConn) + go tcpFwdStream2Conn(inStream, rConn) + go tcpFwdConn2Stream(rConn, outStream) + go fwdStream2Stream(outStream, inStream) } +