Skip to content

Commit

Permalink
Implemented support for UDP service chaining
Browse files Browse the repository at this point in the history
  - Removed registration of UDP and TCP handler functions to their
    protocol IDs. Now there's only the service chain protocol ID.
    Depending on the chain specs, after the chain as been set up, the
    stream will be manually passed to the proper UDP or TCP handler
    function for handling the passing of traffic.
  - The current chain passes data back along the path of VNFs in reverse
    order, but does not re-enter the VNFs for processing.

TCP service chaining currently not implemented and has been disabled.
  • Loading branch information
t-lin committed Jun 30, 2020
1 parent a3b7bdf commit dd3b073
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 120 deletions.
83 changes: 77 additions & 6 deletions l4-proxy/chain-setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ package main
import (
"bytes"
"encoding/gob"
"errors"
"fmt"
"io"
"log"
"strings"
"syscall"

"github.com/libp2p/go-msgio"
"github.com/libp2p/go-libp2p-core/network"
Expand Down Expand Up @@ -355,6 +358,49 @@ func receiveData(cmsr *chainMsgCommunicator) ([]byte, error) {
return msgBytes, nil
}

func fwdStream2Stream(src, dst network.Stream) {
defer func() {
log.Printf("Closing connection %s <=> %s\n",
dst.Conn().LocalPeer(), dst.Conn().RemotePeer())
dst.Reset()
src.Reset()
}()

// Ideally, we can directly forward the connection's bytes. This would
// be most performant. However, we choose to decode and re-encode to be
// able to intercept errors that occur and print messages. This is useful
// for debugging.
var err error
var msgBytes []byte
srcComm := NewChainMsgCommunicator(src)
dstComm := NewChainMsgCommunicator(dst)

for {
if msgBytes, err = receiveData(srcComm); err != nil {
log.Printf("ERROR: %v\n", err)
if errors.Is(err, syscall.EINVAL) || errors.Is(err, io.EOF) {
log.Printf("Connection %s <=> %s closed by remote",
src.Conn().LocalPeer(), src.Conn().RemotePeer())
} else {
log.Printf("Unable to read from connection %s <=> %s\n%v\n",
src.Conn().LocalPeer(), src.Conn().RemotePeer(), err)
}
return
}

if err = sendData(dstComm, msgBytes); err != nil {
if errors.Is(err, syscall.EINVAL) {
log.Printf("Connection %s <=> %s closed",
dst.Conn().LocalPeer(), dst.Conn().RemotePeer())
} else {
log.Printf("ERROR: Unable to write to connection %s <=> %s\n%v\n",
dst.Conn().LocalPeer(), dst.Conn().RemotePeer(), err)
}
return
}
}
}

// Function for source (client) proxy to begin chain setup operation
func setupChain(chainSpec []string) (network.Stream, error) {
if len(chainSpec) < 2 {
Expand Down Expand Up @@ -417,7 +463,6 @@ func setupChain(chainSpec []string) (network.Stream, error) {

// Handler for chainSetupProtoID (i.e. invoked at destination proxy)
func chainSetupHandler(stream network.Stream) {
defer stream.Close()
var err error

// Input stream sender/receiver
Expand All @@ -429,19 +474,22 @@ func chainSetupHandler(stream network.Stream) {
return
}

// Find ourself (the service this proxy represents) in the chain,
// and open connection to next service's proxy
// Find ourself (the service this proxy represents) in the chain, what
// transport protocol we should be using, and the next service in the
// (if it exists).
// - Example chain spec: /tcp/service1/service2/udp/service3
// TODO: Right now we assume a service is specified ONLY ONCE in the chain
// If we allow multiple occurrences, this will need to be re-designed
// TODO: PREVENT LOOPED CHAIN SPECS!
var tpProto string
var tpProto, tpProtoThis string
var nextServ string
foundMe := false
for _, token := range chainSpec {
if token == "udp" || token == "tcp" {
tpProto = token
} else if token == service { // 'service' is currently global
foundMe = true
tpProtoThis = tpProto
} else if foundMe == true {
nextServ = token
break
Expand All @@ -458,22 +506,34 @@ func chainSetupHandler(stream network.Stream) {
err = sendSetupACK(inSendRecv, REV_CHAIN_MSG_PREFIX + service)
if err != nil {
log.Printf("ERROR: Unable to send ACK to previous service\n%v\n", err)
return
}

// Change protocol ID of input stream and invoke proper handler
if tpProtoThis == "udp" {
stream.SetProtocol(udpTunnelProtoID)
udpEndChainHandler(stream)
} else if tpProtoThis == "tcp" {
stream.SetProtocol(tcpTunnelProtoID)
tcpTunnelHandler(stream)
} else {
log.Printf("ERROR: Unknown transport protocol\n")
}

return
}

// If the chain extends beyond this service, keep going.
// Dial the next service and forward the chain setup message.
log.Printf("The next service is: %s %s\n", tpProto, nextServ)

log.Println("Looking for service with name", nextServ, "in hash-lookup")
peerProxyID, err := resolveService(nextServ)
if err != nil {
log.Printf("ERROR: Unable to resolve service %s\n%v\n", nextServ, err)
return
}

// Create output stream sender/receiver
// Create output stream sender/receiver to next service
var outStream network.Stream
if outStream, err = createStream(peerProxyID, chainSetupProtoID); err != nil {
log.Printf("ERROR: Unable to dial target peer %s\n%v\n", peerProxyID, err)
Expand Down Expand Up @@ -502,5 +562,16 @@ func chainSetupHandler(stream network.Stream) {
}
sendSetupACK(inSendRecv, resMsg)

// Change protocol ID of input & output streams and invoke proper handler
if tpProtoThis == "udp" {
outStream.SetProtocol(udpTunnelProtoID)
udpMidChainHandler(stream, outStream)
} else if tpProtoThis == "tcp" {
outStream.SetProtocol(tcpTunnelProtoID)
tcpTunnelHandler(stream)
} else {
log.Printf("ERROR: Unknown transport protocol\n")
}

return
}
49 changes: 8 additions & 41 deletions l4-proxy/l4-proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type Forwarder struct {
// Use TCPAddr instead for endpoint addresses?
ListenAddr string
tcpWorker func(net.Listener, peer.ID)
udpWorker func(network.Network, *net.UDPConn, peer.ID)
udpWorker func(*net.UDPConn, []string)
}

// Maps a remote addr to existing ServiceProxy for that addr
Expand Down Expand Up @@ -198,6 +198,8 @@ func resolveService(servName string) (peer.ID, error) {

// Handles the setting up proxies to services
func requestHandler(w http.ResponseWriter, r *http.Request) {
var err error

log.Println("Got request:", r.URL.RequestURI())

// 1. Find service information and arguments from URL
Expand All @@ -218,16 +220,6 @@ func requestHandler(w http.ResponseWriter, r *http.Request) {
return
}

// Check if this is a service chain, and set it up
_, err := setupChain(chainSpec)
if err != nil {
http.Error(w, "Bad Request", http.StatusBadRequest)
log.Printf("ERROR: Chain setup failed\n%v\n", err)
return
}

return // REMOVE LATER

// Since we're in the HTTP handler, we know the chain has not been set up
tpProto := chainSpec[0]
switch tpProto {
Expand All @@ -238,35 +230,16 @@ func requestHandler(w http.ResponseWriter, r *http.Request) {
return
}

serviceName := chainSpec[1]
log.Printf("Requested protocol is: %s\n", tpProto)
log.Printf("Requested service is: %s\n", serviceName)

log.Println("Looking for service with name", serviceName, "in hash-lookup")
info, err := registry.GetServiceWithHostRouting(
manager.Host.Ctx, manager.Host.Host,
manager.Host.RoutingDiscovery, serviceName,
)
if err != nil {
http.Error(w, "404 Not Found in hash lookup", http.StatusNotFound)
fmt.Fprintf(w, "%s\n", err)
log.Printf("ERROR: Hash lookup failed\n%s\n", err)
return
}

peerProxyID, err := findOrAllocate(info.ContentHash, info.DockerHash)
if err != nil {
http.Error(w, "404 Service Not Found", http.StatusNotFound)
return
}

// Open TCP/UDP tunnel to service and open local TCP listening port
// Start separate goroutine for handling connections and proxying to service
var listenAddr string
if tpProto == "tcp" {
listenAddr, err = openTCPProxy(peerProxyID)
//listenAddr, err = openTCPProxy(peerProxyID)
http.Error(w, "TCP chaining currently disabled", http.StatusMethodNotAllowed)
log.Printf("ERROR: TCP chaining currently disabled\n")
return
} else if tpProto == "udp" {
listenAddr, err = openUDPProxy(peerProxyID)
listenAddr, err = openUDPProxy(chainSpec)
} else {
// Should not get here
http.Error(w, "Unknown transport protocol", http.StatusInternalServerError)
Expand Down Expand Up @@ -425,12 +398,6 @@ func main() {
nodeConfig.BootstrapPeers = *bootstraps
nodeConfig.PSK = *psk

// Set up TCP and UDP handlers
nodeConfig.HandlerProtocolIDs = append(nodeConfig.HandlerProtocolIDs, tcpTunnelProtoID)
nodeConfig.StreamHandlers = append(nodeConfig.StreamHandlers, tcpTunnelHandler)
nodeConfig.HandlerProtocolIDs = append(nodeConfig.HandlerProtocolIDs, udpTunnelProtoID)
nodeConfig.StreamHandlers = append(nodeConfig.StreamHandlers, udpTunnelHandler)

// Set up chain setup handler
nodeConfig.HandlerProtocolIDs = append(nodeConfig.HandlerProtocolIDs, chainSetupProtoID)
nodeConfig.StreamHandlers = append(nodeConfig.StreamHandlers, chainSetupHandler)
Expand Down
Loading

0 comments on commit dd3b073

Please sign in to comment.