Skip to content

Commit

Permalink
Add MPTCP support
Browse files Browse the repository at this point in the history
  • Loading branch information
zhiyipanuber committed Aug 20, 2023
1 parent d19f32e commit 6b6241f
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 2 deletions.
24 changes: 22 additions & 2 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ type ChannelOptions struct {
// This is an unstable API - breaking changes are likely.
RelayTimerVerification bool

// EnableMPTCP enables MPTCP for TCP network connection to increase reliability.
// It requires underlying operating system support MPTCP.
// If EnableMPTCP is false or no MPTCP support, the connection will use normal TCP.
// It's set to false by default.
EnableMPTCP bool

// The reporter to use for reporting stats for this channel.
StatsReporter StatsReporter

Expand Down Expand Up @@ -184,6 +190,7 @@ type Channel struct {
relayMaxConnTimeout time.Duration
relayMaxTombs uint64
relayTimerVerify bool
enableMPTCP bool
internalHandlers *handlerMap
handler Handler
onPeerStatusChanged func(*Peer)
Expand Down Expand Up @@ -275,8 +282,12 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) {
return nil, err
}

// Default to dialContext if dialer is not passed in as an option
// Default to dialContext or dialMPTCPContex
// if dialer is not passed in as an option
dialCtx := dialContext
if opts.EnableMPTCP {
dialCtx = dialMPTCPContext
}
if opts.Dialer != nil {
dialCtx = func(ctx context.Context, hostPort string) (net.Conn, error) {
return opts.Dialer(ctx, "tcp", hostPort)
Expand Down Expand Up @@ -306,6 +317,7 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) {
relayMaxConnTimeout: opts.RelayMaxConnectionTimeout,
relayMaxTombs: opts.RelayMaxTombs,
relayTimerVerify: opts.RelayTimerVerification,
enableMPTCP: opts.EnableMPTCP,
dialer: dialCtx,
connContext: opts.ConnContext,
closed: make(chan struct{}),
Expand Down Expand Up @@ -402,7 +414,15 @@ func (ch *Channel) ListenAndServe(hostPort string) error {
return errAlreadyListening
}

l, err := net.Listen("tcp", hostPort)
var l net.Listener
var err error
if ch.enableMPTCP {
lc := &net.ListenConfig{}
lc.SetMultipathTCP(true)
l, err = lc.Listen(context.Background(), "tcp", hostPort)
} else {
l, err = net.Listen("tcp", hostPort)
}
if err != nil {
mutable.RUnlock()
return err
Expand Down
22 changes: 22 additions & 0 deletions channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,28 @@ func TestNewChannel(t *testing.T) {
}, ch.PeerInfo(), "Wrong local peer info")
}

func TestNewChannelEnableMPTCP(t *testing.T) {
ch, err := NewChannel("svc", &ChannelOptions{
ProcessName: "pname",
EnableMPTCP: true,
})
require.NoError(t, err, "NewChannel failed")

assert.Equal(t, LocalPeerInfo{
ServiceName: "svc",
PeerInfo: PeerInfo{
ProcessName: "pname",
HostPort: ephemeralHostPort,
IsEphemeral: true,
Version: PeerVersion{
Language: "go",
LanguageVersion: strings.TrimPrefix(runtime.Version(), "go"),
TChannelVersion: VersionInfo,
},
},
}, ch.PeerInfo(), "Wrong local peer info")
}

func TestLoggers(t *testing.T) {
ch, err := NewChannel("svc", &ChannelOptions{
Logger: NewLogger(ioutil.Discard),
Expand Down
6 changes: 6 additions & 0 deletions dial_17.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,9 @@ func dialContext(ctx context.Context, hostPort string) (net.Conn, error) {
d := net.Dialer{}
return d.DialContext(ctx, "tcp", hostPort)
}

func dialMPTCPContext(ctx context.Context, hostPort string) (net.Conn, error) {
d := net.Dialer{}
d.SetMultipathTCP(true)
return d.DialContext(ctx, "tcp", hostPort)
}

0 comments on commit 6b6241f

Please sign in to comment.