Skip to content

Commit

Permalink
Implemented support for TCP service chaining
Browse files Browse the repository at this point in the history
  - Like the UDP version, the traffic is passed back along the path of
    VNFs in reverse order, but does not re-enter the VNFs.
  - Barring other improvements (erroneous error messages, stateful
    close, passing of errors back through the chain, etc.), this
    should commit should resolve the TCP/UDP case for #50.
  - As noted in #51, however, we currently pass payloads without
    headers, and proper tunneling that includes headers should be
    addressed within that issue.
  • Loading branch information
t-lin committed Jun 30, 2020
1 parent dd3b073 commit 6b8644b
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 45 deletions.
8 changes: 5 additions & 3 deletions l4-proxy/chain-setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ func chainSetupHandler(stream network.Stream) {
} else if foundMe == true && nextServ == "" {
// This is the destination service
// Return msg to previous proxy acknowledging setup
log.Printf("End of chain reached, sending SetupACK back\n")
log.Printf("End of chain reached, sending SetupACK back...\n")
err = sendSetupACK(inSendRecv, REV_CHAIN_MSG_PREFIX + service)
if err != nil {
log.Printf("ERROR: Unable to send ACK to previous service\n%v\n", err)
Expand All @@ -515,7 +515,7 @@ func chainSetupHandler(stream network.Stream) {
udpEndChainHandler(stream)
} else if tpProtoThis == "tcp" {
stream.SetProtocol(tcpTunnelProtoID)
tcpTunnelHandler(stream)
tcpEndChainHandler(stream)
} else {
log.Printf("ERROR: Unknown transport protocol\n")
}
Expand Down Expand Up @@ -543,6 +543,7 @@ func chainSetupHandler(stream network.Stream) {
outSendRecv := NewChainMsgCommunicator(outStream)

// Forward chain setup request
log.Printf("Middle of chain reached, forwarding SetupRequest...\n")
if err = sendSetupRequest(outSendRecv, chainSpec); err != nil {
log.Printf("ERROR: sendSetupRequest() failed\n%v\n", err)
return
Expand All @@ -560,6 +561,7 @@ func chainSetupHandler(stream network.Stream) {
if strings.HasPrefix(resMsg, REV_CHAIN_MSG_PREFIX) {
resMsg += " " + service
}
log.Printf("Chain SetupACK received, reverse forwarding SetupACK...\n")
sendSetupACK(inSendRecv, resMsg)

// Change protocol ID of input & output streams and invoke proper handler
Expand All @@ -568,7 +570,7 @@ func chainSetupHandler(stream network.Stream) {
udpMidChainHandler(stream, outStream)
} else if tpProtoThis == "tcp" {
outStream.SetProtocol(tcpTunnelProtoID)
tcpTunnelHandler(stream)
tcpMidChainHandler(stream, outStream)
} else {
log.Printf("ERROR: Unknown transport protocol\n")
}
Expand Down
9 changes: 3 additions & 6 deletions l4-proxy/l4-proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ var cache pcache.PeerCache
type Forwarder struct {
// Use TCPAddr instead for endpoint addresses?
ListenAddr string
tcpWorker func(net.Listener, peer.ID)
tcpWorker func(net.Listener, []string)
udpWorker func(*net.UDPConn, []string)
}

// Maps a remote addr to existing ServiceProxy for that addr
// Maps a remote addr to existing Forwarder for that addr
var serv2Fwd = make(map[string]Forwarder)

func findOrAllocate(serviceHash, dockerHash string) (peer.ID, error) {
Expand Down Expand Up @@ -234,10 +234,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) {
// Start separate goroutine for handling connections and proxying to service
var listenAddr string
if tpProto == "tcp" {
//listenAddr, err = openTCPProxy(peerProxyID)
http.Error(w, "TCP chaining currently disabled", http.StatusMethodNotAllowed)
log.Printf("ERROR: TCP chaining currently disabled\n")
return
listenAddr, err = openTCPProxy(chainSpec)
} else if tpProto == "udp" {
listenAddr, err = openUDPProxy(chainSpec)
} else {
Expand Down
159 changes: 123 additions & 36 deletions l4-proxy/tcp-proxy.go
Original file line number Diff line number Diff line change
@@ -1,71 +1,141 @@
package main

import (
"errors"
"fmt"
"io"
"log"
"net"
"strings"
"syscall"

"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"

"github.com/t-lin/go-libp2p-gostream"
)

var tcpTunnelProtoID = protocol.ID("/LCATunnelTCP/1.0")

// TCP data forwarder
// Simplified version of pipe() from https://github.com/jpillora/go-tcp-proxy/blob/master/proxy.go
func tcpFwdData(src, dst net.Conn) {
const MAX_TCP_TUNNEL_PAYLOAD = 0xffff - 20 - 20 // Max uint16 - Min TCP header size - IP header size

// TODO: Only difference between this and udpFwdStream2Conn is
// the dstAddr param and WriteTo() in the body... can they
// be somehow merged?
func tcpFwdStream2Conn(src network.Stream, dst net.Conn) {
// Not shared amongst multiple clients, safe to close upstream connection
defer func() {
log.Printf("Closing connection %s <=> %s\n", dst.LocalAddr(), dst.RemoteAddr())
src.Reset()
dst.Close()
}()

var err error
var nBytes, nBytesW, n int
buf := make([]byte, 0xffff) // 64k buffer
var data []byte

srcComm := NewChainMsgCommunicator(src)

for {
if data, err = receiveData(srcComm); err != nil {
log.Printf("ERROR: %v\n", err)
if errors.Is(err, syscall.EINVAL) || errors.Is(err, io.EOF) {
log.Printf("Connection %s <=> %s closed by remote",
src.Conn().LocalPeer(), src.Conn().RemotePeer())
} else {
log.Printf("Unable to read from connection %s <=> %s\n%v\n",
src.Conn().LocalPeer(), src.Conn().RemotePeer(), err)
}
return
}

nBytes = len(data)
nBytesW = 0
for nBytesW < nBytes {
n, err = dst.Write(data)
if err != nil {
if err == syscall.EINVAL {
log.Printf("Connection %s <=> %s closed", dst.LocalAddr(), dst.RemoteAddr())
} else {
log.Printf("ERROR: Unable to write to TCP connection %s <=> %s\n%v\n",
dst.LocalAddr(), dst.RemoteAddr(), err)
}
return
}

nBytesW += n
}
}
}

// TODO: This now seems identical to udpFwdConn2Stream... except payload size. Merge the two?
func tcpFwdConn2Stream(src net.Conn, dst network.Stream) {
defer func() {
log.Printf("Closing connection %s <=> %s\n",
dst.Conn().LocalPeer(), dst.Conn().RemotePeer())
dst.Reset()
src.Close()
}()

var err error
var nBytes int
buf := make([]byte, MAX_TCP_TUNNEL_PAYLOAD)

dstComm := NewChainMsgCommunicator(dst)

for {
// NOTE: Using io.Copy or io.CopyBuffer slows down *a lot* after 2-3 runs.
// Not clear why right now. Thus, do the copy manually.
nBytes, err = src.Read(buf)
if err != nil {
log.Printf("Connection %s <=> %s closed", src.LocalAddr(), src.RemoteAddr())
if errors.Is(err, syscall.EINVAL) || errors.Is(err, io.EOF) {
log.Printf("Connection %s <=> %s closed by remote", src.LocalAddr(), src.RemoteAddr())
} else {
log.Printf("Unable to read from TCP connection %s <=> %s\n%v\n",
src.LocalAddr(), src.RemoteAddr(), err)
}
return
}
data := buf[:nBytes]

for nBytesW < nBytes {
n, err = dst.Write(data)
if err != nil {
log.Printf("Connection %s <=> %s closed", dst.LocalAddr(), dst.RemoteAddr())
return
if err = sendData(dstComm, data); err != nil {
if errors.Is(err, syscall.EINVAL) {
log.Printf("Connection %s <=> %s closed",
dst.Conn().LocalPeer(), dst.Conn().RemotePeer())
} else {
log.Printf("ERROR: Unable to write to connection %s <=> %s\n%v\n",
dst.Conn().LocalPeer(), dst.Conn().RemotePeer(), err)
}

nBytesW += n
return
}
}
}

// Connection forwarder
func tcpFwdConnToServ(lConn net.Conn, targetPeer peer.ID) {
defer lConn.Close()
// This function is separate from tcpServiceProxy() so as to avoid HOL
// blocking in the event the chain setup takes too long and another
// client wants to connect.
func tcpFwdConnToServ(lConn net.Conn, chainSpec []string) {
log.Printf("Accepted TCP conn: %s <=> %s\n", lConn.LocalAddr(), lConn.RemoteAddr())

p2pNode := manager.Host
rConn, err := gostream.Dial(p2pNode.Ctx, p2pNode.Host, targetPeer, tcpTunnelProtoID)
rConn, err := setupChain(chainSpec)
if err != nil {
log.Printf("ERROR: Unable to dial target peer %s\n%v\n", targetPeer, err)
log.Printf("ERROR: Unable to set up chain %s\n%v\n", chainSpec, err)
return
}
defer rConn.Close()

// Forward data in each direction
go tcpFwdData(lConn, rConn)
tcpFwdData(rConn, lConn)
// Closing will be done within the tcpFwd* functions
go tcpFwdConn2Stream(lConn, rConn)
go tcpFwdStream2Conn(rConn, lConn)
}

// Implementation of ServiceProxy
func tcpServiceProxy(listen net.Listener, targetPeer peer.ID) {
defer listen.Close()
// Implementation of TCP service proxy. Invoked by the source proxy in the
// chain. Responsible for setting up the chain and passing the ingress stream
// to the appropriate data forwarders.
func tcpServiceProxy(listen net.Listener, chainSpec []string) {
defer func() {
log.Printf("Shutting down proxy for chain %s\n", chainSpec)
listen.Close()
}()

for {
conn, err := listen.Accept()
Expand All @@ -79,7 +149,7 @@ func tcpServiceProxy(listen net.Listener, targetPeer peer.ID) {
}

// Forward connection
go tcpFwdConnToServ(conn, targetPeer)
go tcpFwdConnToServ(conn, chainSpec)
}
}

Expand All @@ -90,9 +160,9 @@ func tcpServiceProxy(listen net.Listener, targetPeer peer.ID) {
// TODO: In the future, perhaps the connection type info (TCP/UDP) can be stored in hash-lookup?
//
// Returns a string of the new TCP listening endpoint address
func openTCPProxy(servicePeer peer.ID) (string, error) {
func openTCPProxy(chainSpec []string) (string, error) {
var listenAddr string
serviceKey := "tcp://" + string(servicePeer)
serviceKey := strings.Join(chainSpec, "/")
if _, exists := serv2Fwd[serviceKey]; !exists {
listen, err := net.Listen("tcp", ctrlHost + ":") // automatically choose port
if err != nil {
Expand All @@ -104,7 +174,7 @@ func openTCPProxy(servicePeer peer.ID) (string, error) {
ListenAddr: listenAddr,
tcpWorker: tcpServiceProxy,
}
go serv2Fwd[serviceKey].tcpWorker(listen, servicePeer)
go serv2Fwd[serviceKey].tcpWorker(listen, chainSpec)
} else {
listenAddr = serv2Fwd[serviceKey].ListenAddr
}
Expand All @@ -114,10 +184,26 @@ func openTCPProxy(servicePeer peer.ID) (string, error) {

// Handler for tcpTunnelProtoID (i.e. invoked at destination proxy)
// Make a connection to the local service and forward data to/from it
func tcpTunnelHandler(stream network.Stream) {
lConn := gostream.NewConn(stream)
defer lConn.Close()
func tcpEndChainHandler(stream network.Stream) {
// Resolve and open connection to destination service
rAddr, err := net.ResolveTCPAddr("tcp", servEndpoint)
if err != nil {
log.Printf("ERROR: Unable to resolve target address %s\n%v\n", servEndpoint, err)
return
}

rConn, err := net.DialTCP("tcp", nil, rAddr)
if err != nil {
log.Printf("ERROR: Unable to dial target address %s\n%v\n", servEndpoint, err)
return
}

// Forward data in each direction
go tcpFwdStream2Conn(stream, rConn)
go tcpFwdConn2Stream(rConn, stream)
}

func tcpMidChainHandler(inStream, outStream network.Stream) {
// Resolve and open connection to destination service
rAddr, err := net.ResolveTCPAddr("tcp", servEndpoint)
if err != nil {
Expand All @@ -130,9 +216,10 @@ func tcpTunnelHandler(stream network.Stream) {
log.Printf("ERROR: Unable to dial target address %s\n%v\n", servEndpoint, err)
return
}
defer rConn.Close()

// Forward data in each direction
go tcpFwdData(lConn, rConn)
tcpFwdData(rConn, lConn)
go tcpFwdStream2Conn(inStream, rConn)
go tcpFwdConn2Stream(rConn, outStream)
go fwdStream2Stream(outStream, inStream)
}

0 comments on commit 6b8644b

Please sign in to comment.