Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support edge-triggered I/O #576

Merged
merged 9 commits into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions acceptor_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (eng *engine) accept1(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error {
if err != nil {
switch err {
case unix.EINTR, unix.EAGAIN, unix.ECONNABORTED:
// ECONNABORTED means that a socket on the listen
// ECONNABORTED indicates that a socket on the listen
// queue was closed before we Accept()ed it;
// it's a silly error, so try again.
return nil
Expand Down Expand Up @@ -66,11 +66,11 @@ func (el *eventloop) accept1(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags)
return el.readUDP1(fd, ev, flags)
}

nfd, sa, err := socket.Accept(el.ln.fd)
nfd, sa, err := socket.Accept(fd)
if err != nil {
switch err {
case unix.EINTR, unix.EAGAIN, unix.ECONNABORTED:
// ECONNABORTED means that a socket on the listen
// ECONNABORTED indicates that a socket on the listen
// queue was closed before we Accept()ed it;
// it's a silly error, so try again.
return nil
Expand All @@ -87,7 +87,11 @@ func (el *eventloop) accept1(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags)
}

c := newTCPConn(nfd, el, sa, el.ln.addr, remoteAddr)
if err = el.poller.AddRead(&c.pollAttachment); err != nil {
addEvents := el.poller.AddRead
if el.engine.opts.EdgeTriggeredIO {
addEvents = el.poller.AddReadWrite
}
if err = addEvents(&c.pollAttachment, el.engine.opts.EdgeTriggeredIO); err != nil {
return err
}
el.connections.addConn(c, el.idx)
Expand Down
178 changes: 140 additions & 38 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type connHandler struct {
type clientEvents struct {
*BuiltinEventEngine
tester *testing.T
svr *testClientServer
svr *testClient
packetLen int
}

Expand Down Expand Up @@ -87,117 +87,219 @@ func (ev *clientEvents) OnShutdown(e Engine) {
assert.EqualValuesf(ev.tester, fd, -1, "expected -1, but got: %d", fd)
}

func TestServeWithGnetClient(t *testing.T) {
func TestClient(t *testing.T) {
// start an engine
// connect 10 clients
// each client will pipe random data for 1-3 seconds.
// the writes to the engine will be random sizes. 0KB - 1MB.
// the engine will echo back the data.
// waits for graceful connection closing.
t.Run("poll", func(t *testing.T) {
t.Run("poll-LT", func(t *testing.T) {
t.Run("tcp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "tcp", ":9991", false, false, false, false, 10, RoundRobin)
runClient(t, "tcp", ":9991", false, false, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "tcp", ":9992", false, false, true, false, 10, LeastConnections)
runClient(t, "tcp", ":9992", false, false, true, false, 10, LeastConnections)
})
})
t.Run("tcp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "tcp", ":9991", false, false, false, true, 10, RoundRobin)
runClient(t, "tcp", ":9991", false, false, false, true, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "tcp", ":9992", false, false, true, true, 10, LeastConnections)
runClient(t, "tcp", ":9992", false, false, true, true, 10, LeastConnections)
})
})
t.Run("udp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "udp", ":9991", false, false, false, false, 10, RoundRobin)
runClient(t, "udp", ":9991", false, false, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "udp", ":9992", false, false, true, false, 10, LeastConnections)
runClient(t, "udp", ":9992", false, false, true, false, 10, LeastConnections)
})
})
t.Run("udp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "udp", ":9991", false, false, false, true, 10, RoundRobin)
runClient(t, "udp", ":9991", false, false, false, true, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "udp", ":9992", false, false, true, true, 10, LeastConnections)
runClient(t, "udp", ":9992", false, false, true, true, 10, LeastConnections)
})
})
t.Run("unix", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "unix", "gnet1.sock", false, false, false, false, 10, RoundRobin)
runClient(t, "unix", "gnet1.sock", false, false, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "unix", "gnet2.sock", false, false, true, false, 10, SourceAddrHash)
runClient(t, "unix", "gnet2.sock", false, false, true, false, 10, SourceAddrHash)
})
})
t.Run("unix-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "unix", "gnet1.sock", false, false, false, true, 10, RoundRobin)
runClient(t, "unix", "gnet1.sock", false, false, false, true, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "unix", "gnet2.sock", false, false, true, true, 10, SourceAddrHash)
runClient(t, "unix", "gnet2.sock", false, false, true, true, 10, SourceAddrHash)
})
})
})

t.Run("poll-reuseport", func(t *testing.T) {
t.Run("poll-ET", func(t *testing.T) {
t.Run("tcp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "tcp", ":9991", true, true, false, false, 10, RoundRobin)
runClient(t, "tcp", ":9991", true, false, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "tcp", ":9992", true, true, true, false, 10, LeastConnections)
runClient(t, "tcp", ":9992", true, false, true, false, 10, LeastConnections)
})
})
t.Run("tcp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "tcp", ":9991", true, true, false, true, 10, RoundRobin)
runClient(t, "tcp", ":9991", true, false, false, true, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "tcp", ":9992", true, true, true, false, 10, LeastConnections)
runClient(t, "tcp", ":9992", true, false, true, true, 10, LeastConnections)
})
})
t.Run("udp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "udp", ":9991", true, true, false, false, 10, RoundRobin)
runClient(t, "udp", ":9991", true, false, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "udp", ":9992", true, true, true, false, 10, LeastConnections)
runClient(t, "udp", ":9992", true, false, true, false, 10, LeastConnections)
})
})
t.Run("udp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "udp", ":9991", true, true, false, false, 10, RoundRobin)
runClient(t, "udp", ":9991", true, false, false, true, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "udp", ":9992", true, true, true, true, 10, LeastConnections)
runClient(t, "udp", ":9992", true, false, true, true, 10, LeastConnections)
})
})
t.Run("unix", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "unix", "gnet1.sock", true, true, false, false, 10, RoundRobin)
runClient(t, "unix", "gnet1.sock", true, false, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "unix", "gnet2.sock", true, true, true, false, 10, LeastConnections)
runClient(t, "unix", "gnet2.sock", true, false, true, false, 10, SourceAddrHash)
})
})
t.Run("unix-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "unix", "gnet1.sock", true, true, false, true, 10, RoundRobin)
runClient(t, "unix", "gnet1.sock", true, false, false, true, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "unix", "gnet2.sock", true, true, true, true, 10, LeastConnections)
runClient(t, "unix", "gnet2.sock", true, false, true, true, 10, SourceAddrHash)
})
})
})

t.Run("poll-LT-reuseport", func(t *testing.T) {
t.Run("tcp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "tcp", ":9991", false, true, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "tcp", ":9992", false, true, true, false, 10, LeastConnections)
})
})
t.Run("tcp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "tcp", ":9991", false, true, false, true, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "tcp", ":9992", false, true, true, false, 10, LeastConnections)
})
})
t.Run("udp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "udp", ":9991", false, true, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "udp", ":9992", false, true, true, false, 10, LeastConnections)
})
})
t.Run("udp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "udp", ":9991", false, true, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "udp", ":9992", false, true, true, true, 10, LeastConnections)
})
})
t.Run("unix", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "unix", "gnet1.sock", false, true, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "unix", "gnet2.sock", false, true, true, false, 10, LeastConnections)
})
})
t.Run("unix-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "unix", "gnet1.sock", false, true, false, true, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "unix", "gnet2.sock", false, true, true, true, 10, LeastConnections)
})
})
})

t.Run("poll-ET-reuseport", func(t *testing.T) {
t.Run("tcp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "tcp", ":9991", true, true, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "tcp", ":9992", true, true, true, false, 10, LeastConnections)
})
})
t.Run("tcp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "tcp", ":9991", true, true, false, true, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "tcp", ":9992", true, true, true, false, 10, LeastConnections)
})
})
t.Run("udp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "udp", ":9991", true, true, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "udp", ":9992", true, true, true, false, 10, LeastConnections)
})
})
t.Run("udp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "udp", ":9991", true, true, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "udp", ":9992", true, true, true, true, 10, LeastConnections)
})
})
t.Run("unix", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "unix", "gnet1.sock", true, true, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "unix", "gnet2.sock", true, true, true, false, 10, LeastConnections)
})
})
t.Run("unix-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "unix", "gnet1.sock", true, true, false, true, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "unix", "gnet2.sock", true, true, true, true, 10, LeastConnections)
})
})
})
}

type testClientServer struct {
type testClient struct {
*BuiltinEventEngine
client *Client
tester *testing.T
Expand All @@ -215,20 +317,20 @@ type testClientServer struct {
udpReadHeader int32
}

func (s *testClientServer) OnBoot(eng Engine) (action Action) {
func (s *testClient) OnBoot(eng Engine) (action Action) {
s.eng = eng
return
}

func (s *testClientServer) OnOpen(c Conn) (out []byte, action Action) {
func (s *testClient) OnOpen(c Conn) (out []byte, action Action) {
c.SetContext(&sync.Once{})
atomic.AddInt32(&s.connected, 1)
require.NotNil(s.tester, c.LocalAddr(), "nil local addr")
require.NotNil(s.tester, c.RemoteAddr(), "nil remote addr")
return
}

func (s *testClientServer) OnClose(c Conn, err error) (action Action) {
func (s *testClient) OnClose(c Conn, err error) (action Action) {
if err != nil {
logging.Debugf("error occurred on closed, %v\n", err)
}
Expand All @@ -246,13 +348,13 @@ func (s *testClientServer) OnClose(c Conn, err error) (action Action) {
return
}

func (s *testClientServer) OnShutdown(Engine) {
func (s *testClient) OnShutdown(Engine) {
if s.network == "udp" {
require.EqualValues(s.tester, int32(s.nclients), atomic.LoadInt32(&s.udpReadHeader))
}
}

func (s *testClientServer) OnTraffic(c Conn) (action Action) {
func (s *testClient) OnTraffic(c Conn) (action Action) {
readHeader := func() {
ping := make([]byte, len(pingMsg))
n, err := io.ReadFull(c, ping)
Expand Down Expand Up @@ -302,7 +404,7 @@ func (s *testClientServer) OnTraffic(c Conn) (action Action) {
return
}

func (s *testClientServer) OnTick() (delay time.Duration, action Action) {
func (s *testClient) OnTick() (delay time.Duration, action Action) {
delay = time.Second / 5
if atomic.CompareAndSwapInt32(&s.started, 0, 1) {
for i := 0; i < s.nclients; i++ {
Expand All @@ -321,8 +423,8 @@ func (s *testClientServer) OnTick() (delay time.Duration, action Action) {
return
}

func testServeWithGnetClient(t *testing.T, network, addr string, reuseport, reuseaddr, multicore, async bool, nclients int, lb LoadBalancing) {
ts := &testClientServer{
func runClient(t *testing.T, network, addr string, et, reuseport, multicore, async bool, nclients int, lb LoadBalancing) {
ts := &testClient{
tester: t,
network: network,
addr: addr,
Expand All @@ -347,10 +449,10 @@ func testServeWithGnetClient(t *testing.T, network, addr string, reuseport, reus

err = Run(ts,
network+"://"+addr,
WithEdgeTriggeredIO(et),
WithLockOSThread(async),
WithMulticore(multicore),
WithReusePort(reuseport),
WithReuseAddr(reuseaddr),
WithTicker(true),
WithTCPKeepAlive(time.Minute*1),
WithTCPNoDelay(TCPDelay),
Expand Down
Loading
Loading