Skip to content

Commit

Permalink
Merge pull request #620 from panjf2000/dev
Browse files Browse the repository at this point in the history
patch: v2.5.5
  • Loading branch information
panjf2000 authored Jun 25, 2024
2 parents 6d82dc1 + 66c2259 commit cf12444
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 37 deletions.
46 changes: 22 additions & 24 deletions acceptor_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,18 @@ import (
func (el *eventloop) accept0(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error {
for {
nfd, sa, err := socket.Accept(fd)
if err != nil {
switch err {
case unix.EAGAIN: // the Accept queue has been drained out, we can return now
return nil
case unix.EINTR, unix.ECONNRESET, unix.ECONNABORTED:
// ECONNRESET or ECONNABORTED could indicate that a socket
// in the Accept queue was closed before we Accept()ed it.
// It's a silly error, let's retry it.
continue
default:
el.getLogger().Errorf("Accept() failed due to error: %v", err)
return errors.ErrAcceptSocket
}
switch err {
case nil:
case unix.EAGAIN: // the Accept queue has been drained out, we can return now
return nil
case unix.EINTR, unix.ECONNRESET, unix.ECONNABORTED:
// ECONNRESET or ECONNABORTED could indicate that a socket
// in the Accept queue was closed before we Accept()ed it.
// It's a silly error, let's retry it.
continue
default:
el.getLogger().Errorf("Accept() failed due to error: %v", err)
return errors.ErrAcceptSocket
}

remoteAddr := socket.SockaddrToTCPOrUnixAddr(sa)
Expand Down Expand Up @@ -71,17 +70,16 @@ func (el *eventloop) accept(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags) e
}

nfd, sa, err := socket.Accept(fd)
if err != nil {
switch err {
case unix.EINTR, unix.EAGAIN, unix.ECONNRESET, unix.ECONNABORTED:
// ECONNRESET or ECONNABORTED could indicate that a socket
// in the Accept queue was closed before we Accept()ed it.
// It's a silly error, let's retry it.
return nil
default:
el.getLogger().Errorf("Accept() failed due to error: %v", err)
return errors.ErrAcceptSocket
}
switch err {
case nil:
case unix.EINTR, unix.EAGAIN, unix.ECONNRESET, unix.ECONNABORTED:
// ECONNRESET or ECONNABORTED could indicate that a socket
// in the Accept queue was closed before we Accept()ed it.
// It's a silly error, let's retry it.
return nil
default:
el.getLogger().Errorf("Accept() failed due to error: %v", err)
return errors.ErrAcceptSocket
}

remoteAddr := socket.SockaddrToTCPOrUnixAddr(sa)
Expand Down
33 changes: 29 additions & 4 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ import (
"io"
"math/rand"
"net"
"path/filepath"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

errorx "github.com/panjf2000/gnet/v2/pkg/errors"
"github.com/panjf2000/gnet/v2/pkg/logging"
Expand Down Expand Up @@ -437,7 +439,7 @@ func runClient(t *testing.T, network, addr string, et, reuseport, multicore, asy
clientEV := &clientEvents{tester: t, packetLen: streamLen, svr: ts}
ts.client, err = NewClient(
clientEV,
WithLogLevel(logging.DebugLevel),
WithTCPNoDelay(TCPNoDelay),
WithLockOSThread(true),
WithTicker(true),
)
Expand All @@ -455,7 +457,6 @@ func runClient(t *testing.T, network, addr string, et, reuseport, multicore, asy
WithReusePort(reuseport),
WithTicker(true),
WithTCPKeepAlive(time.Minute*1),
WithTCPNoDelay(TCPDelay),
WithLoadBalancing(lb))
assert.NoError(t, err)
}
Expand Down Expand Up @@ -599,9 +600,22 @@ func testConnWakeImmediately(t *testing.T, client *Client, clientEV *clientEvent
}

func TestWakeConnImmediately(t *testing.T) {
currentLogger, currentFlusher := logging.GetDefaultLogger(), logging.GetDefaultFlusher()
t.Cleanup(func() {
logging.SetDefaultLoggerAndFlusher(currentLogger, currentFlusher) // restore
})

clientEV := &clientEventsForWake{tester: t}
client, err := NewClient(clientEV, WithLogLevel(logging.DebugLevel))
logPath := filepath.Join(t.TempDir(), "gnet-test-wake-conn-immediately.log")
client, err := NewClient(clientEV,
WithSocketRecvBuffer(4*1024),
WithSocketSendBuffer(4*1024),
WithLogPath(logPath),
WithLogLevel(logging.WarnLevel),
WithReadBufferCap(512),
WithWriteBufferCap(512))
assert.NoError(t, err)
logging.Cleanup()

err = client.Start()
assert.NoError(t, err)
Expand All @@ -614,6 +628,11 @@ func TestWakeConnImmediately(t *testing.T) {
}

func TestClientReadOnEOF(t *testing.T) {
currentLogger, currentFlusher := logging.GetDefaultLogger(), logging.GetDefaultFlusher()
t.Cleanup(func() {
logging.SetDefaultLoggerAndFlusher(currentLogger, currentFlusher) // restore
})

ln, err := net.Listen("tcp", "127.0.0.1:9999")
assert.NoError(t, err)
defer ln.Close()
Expand All @@ -635,7 +654,13 @@ func TestClientReadOnEOF(t *testing.T) {
}, 1),
data: []byte("test"),
}
cli, err := NewClient(ev)
cli, err := NewClient(ev,
WithSocketRecvBuffer(4*1024),
WithSocketSendBuffer(4*1024),
WithTCPKeepAlive(time.Minute),
WithLogger(zap.NewExample().Sugar()),
WithReadBufferCap(32*1024),
WithWriteBufferCap(32*1024))
assert.NoError(t, err)
defer cli.Stop() //nolint:errcheck

Expand Down
13 changes: 8 additions & 5 deletions client_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,15 +200,16 @@ func (cli *Client) EnrollContext(c net.Conn, ctx interface{}) (Conn, error) {
)
switch c.(type) {
case *net.UnixConn:
if sockAddr, _, _, err = socket.GetUnixSockAddr(c.RemoteAddr().Network(), c.RemoteAddr().String()); err != nil {
sockAddr, _, _, err = socket.GetUnixSockAddr(c.RemoteAddr().Network(), c.RemoteAddr().String())
if err != nil {
return nil, err
}
ua := c.LocalAddr().(*net.UnixAddr)
ua.Name = c.RemoteAddr().String() + "." + strconv.Itoa(dupFD)
gc = newTCPConn(dupFD, cli.el, sockAddr, c.LocalAddr(), c.RemoteAddr())
case *net.TCPConn:
if cli.opts.TCPNoDelay == TCPDelay {
if err = socket.SetNoDelay(dupFD, 0); err != nil {
if cli.opts.TCPNoDelay == TCPNoDelay {
if err = socket.SetNoDelay(dupFD, 1); err != nil {
return nil, err
}
}
Expand All @@ -217,12 +218,14 @@ func (cli *Client) EnrollContext(c net.Conn, ctx interface{}) (Conn, error) {
return nil, err
}
}
if sockAddr, _, _, _, err = socket.GetTCPSockAddr(c.RemoteAddr().Network(), c.RemoteAddr().String()); err != nil {
sockAddr, _, _, _, err = socket.GetTCPSockAddr(c.RemoteAddr().Network(), c.RemoteAddr().String())
if err != nil {
return nil, err
}
gc = newTCPConn(dupFD, cli.el, sockAddr, c.LocalAddr(), c.RemoteAddr())
case *net.UDPConn:
if sockAddr, _, _, _, err = socket.GetUDPSockAddr(c.RemoteAddr().Network(), c.RemoteAddr().String()); err != nil {
sockAddr, _, _, _, err = socket.GetUDPSockAddr(c.RemoteAddr().Network(), c.RemoteAddr().String())
if err != nil {
return nil, err
}
gc = newUDPConn(dupFD, cli.el, c.LocalAddr(), sockAddr, true)
Expand Down
16 changes: 14 additions & 2 deletions gnet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"math/rand"
"net"
"path/filepath"
"runtime"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -642,7 +643,7 @@ func runServer(t *testing.T, addrs []string, et, reuseport, multicore, async, wr
WithReusePort(reuseport),
WithTicker(true),
WithTCPKeepAlive(time.Minute),
WithTCPNoDelay(TCPDelay),
WithTCPNoDelay(TCPNoDelay),
WithLoadBalancing(lb))
} else {
err = Run(ts,
Expand Down Expand Up @@ -883,8 +884,19 @@ func (t *testShutdownServer) OnTick() (delay time.Duration, action Action) {
}

func testShutdown(t *testing.T, network, addr string) {
currentLogger, currentFlusher := logging.GetDefaultLogger(), logging.GetDefaultFlusher()
t.Cleanup(func() {
logging.SetDefaultLoggerAndFlusher(currentLogger, currentFlusher) // restore
})

events := &testShutdownServer{tester: t, network: network, addr: addr, N: 100}
err := Run(events, network+"://"+addr, WithTicker(true), WithReadBufferCap(512), WithWriteBufferCap(512))
logPath := filepath.Join(t.TempDir(), "gnet-test-shutdown.log")
err := Run(events, network+"://"+addr,
WithLogPath(logPath),
WithLogLevel(logging.WarnLevel),
WithTicker(true),
WithReadBufferCap(512),
WithWriteBufferCap(512))
assert.NoError(t, err)
require.Equal(t, 0, int(events.clients), "did not close all clients")
}
Expand Down
7 changes: 5 additions & 2 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,12 @@ type Options struct {

// TCPNoDelay controls whether the operating system should delay
// packet transmission in hopes of sending fewer packets (Nagle's algorithm).
// When this option is assign to TCPNoDelay, TCP_NODELAY socket option will
// be turned on, on the contrary, if it is assigned to TCPDelay, the socket
// option will be turned off.
//
// The default is true (no delay), meaning that data is sent
// as soon as possible after a write operation.
// The default is TCPNoDelay, meaning that TCP_NODELAY is turned on and data
// will not be buffered but sent as soon as possible after a write operation.
TCPNoDelay TCPSocketOpt

// SocketRecvBuffer sets the maximum socket receive buffer in bytes.
Expand Down

0 comments on commit cf12444

Please sign in to comment.