Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MPTCP support #898

Open
wants to merge 5 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ type ChannelOptions struct {
// This is an unstable API - breaking changes are likely.
RelayTimerVerification bool

// EnableMPTCP enables MPTCP for TCP network connection to increase reliability.
// It requires underlying operating system support MPTCP.
// If EnableMPTCP is false or no MPTCP support, the connection will use normal TCP.
// It's set to false by default.
EnableMPTCP bool

// The reporter to use for reporting stats for this channel.
StatsReporter StatsReporter

Expand Down Expand Up @@ -184,6 +190,7 @@ type Channel struct {
relayMaxConnTimeout time.Duration
relayMaxTombs uint64
relayTimerVerify bool
enableMPTCP bool
internalHandlers *handlerMap
handler Handler
onPeerStatusChanged func(*Peer)
Expand Down Expand Up @@ -275,8 +282,12 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) {
return nil, err
}

// Default to dialContext if dialer is not passed in as an option
// Default to dialContext or dialMPTCPContex
// if dialer is not passed in as an option
dialCtx := dialContext
if opts.EnableMPTCP {
dialCtx = dialMPTCPContext
}
if opts.Dialer != nil {
dialCtx = func(ctx context.Context, hostPort string) (net.Conn, error) {
return opts.Dialer(ctx, "tcp", hostPort)
Comment on lines 292 to 294
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if I set both EnableMPTCP and Dialer?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we added a comment regarding this

Expand Down Expand Up @@ -306,6 +317,7 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) {
relayMaxConnTimeout: opts.RelayMaxConnectionTimeout,
relayMaxTombs: opts.RelayMaxTombs,
relayTimerVerify: opts.RelayTimerVerification,
enableMPTCP: opts.EnableMPTCP,
dialer: dialCtx,
connContext: opts.ConnContext,
closed: make(chan struct{}),
Expand Down Expand Up @@ -402,7 +414,15 @@ func (ch *Channel) ListenAndServe(hostPort string) error {
return errAlreadyListening
}

l, err := net.Listen("tcp", hostPort)
var l net.Listener
var err error
if ch.enableMPTCP {
lc := &net.ListenConfig{}
lc.SetMultipathTCP(true)
l, err = lc.Listen(context.Background(), "tcp", hostPort)
} else {
l, err = net.Listen("tcp", hostPort)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First, I can confirm the correctness of existing code. The source code:

func Listen(network, address string) (Listener, error) {
	var lc ListenConfig
	return lc.Listen(context.Background(), network, address)
}

We can simplify this block and skip the if/else check. When the option is false, it is safe to set it as well.

lc := &net.ListenConfig{}
lc.SetMultipathTCP(ch.enableMPTCP) 
l, err := lc.Listen(context.Background(), "tcp", hostPort)

if err != nil {
mutable.RUnlock()
return err
Expand Down
22 changes: 22 additions & 0 deletions channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,28 @@ func TestNewChannel(t *testing.T) {
}, ch.PeerInfo(), "Wrong local peer info")
}

func TestNewChannelEnableMPTCP(t *testing.T) {
ch, err := NewChannel("svc", &ChannelOptions{
ProcessName: "pname",
EnableMPTCP: true,
})
require.NoError(t, err, "NewChannel failed")

assert.Equal(t, LocalPeerInfo{
ServiceName: "svc",
PeerInfo: PeerInfo{
ProcessName: "pname",
HostPort: ephemeralHostPort,
IsEphemeral: true,
Version: PeerVersion{
Language: "go",
LanguageVersion: strings.TrimPrefix(runtime.Version(), "go"),
TChannelVersion: VersionInfo,
},
},
}, ch.PeerInfo(), "Wrong local peer info")
}

func TestLoggers(t *testing.T) {
ch, err := NewChannel("svc", &ChannelOptions{
Logger: NewLogger(ioutil.Discard),
Expand Down
6 changes: 6 additions & 0 deletions dial_17.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,9 @@ func dialContext(ctx context.Context, hostPort string) (net.Conn, error) {
d := net.Dialer{}
return d.DialContext(ctx, "tcp", hostPort)
}

func dialMPTCPContext(ctx context.Context, hostPort string) (net.Conn, error) {
d := net.Dialer{}
d.SetMultipathTCP(true)
return d.DialContext(ctx, "tcp", hostPort)
}