From 3e1efa6be172c0d4a4bcc8a84c9fadbb3666f974 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Mon, 11 Nov 2024 17:40:18 +0800 Subject: [PATCH] opt: optimize Conn.Next and Conn.Peek Besides, add tests of partial read. --- connection_unix.go | 10 ++++-- gnet_test.go | 78 +++++++++++++++++++++++++++++----------------- 2 files changed, 57 insertions(+), 31 deletions(-) diff --git a/connection_unix.go b/connection_unix.go index 3ccac4ab1..d399c9361 100644 --- a/connection_unix.go +++ b/connection_unix.go @@ -318,16 +318,18 @@ func (c *conn) Next(n int) (buf []byte, err error) { } else if n <= 0 { n = totalLen } + if c.inboundBuffer.IsEmpty() { buf = c.buffer[:n] c.buffer = c.buffer[n:] return } + head, tail := c.inboundBuffer.Peek(n) defer c.inboundBuffer.Discard(n) //nolint:errcheck c.loop.cache.Reset() c.loop.cache.Write(head) - if len(head) >= n { + if len(head) == n { return c.loop.cache.Bytes(), err } c.loop.cache.Write(tail) @@ -348,12 +350,14 @@ func (c *conn) Peek(n int) (buf []byte, err error) { } else if n <= 0 { n = totalLen } + if c.inboundBuffer.IsEmpty() { return c.buffer[:n], err } + head, tail := c.inboundBuffer.Peek(n) - if len(head) >= n { - return head[:n], err + if len(head) == n { + return head, err } c.loop.cache.Reset() c.loop.cache.Write(head) diff --git a/gnet_test.go b/gnet_test.go index 201c6b9de..b6755e038 100644 --- a/gnet_test.go +++ b/gnet_test.go @@ -8,6 +8,7 @@ import ( "encoding/binary" "errors" "io" + "math" "math/rand" "net" "path/filepath" @@ -1542,7 +1543,8 @@ type simServer struct { multicore bool nclients int packetSize int - packetBatch int + batchWrite int + batchRead int started int32 connected int32 disconnected int32 @@ -1579,7 +1581,7 @@ func (s *simServer) OnClose(_ Conn, err error) (action Action) { func (s *simServer) OnTraffic(c Conn) (action Action) { codec := c.Context().(*testCodec) var packets [][]byte - for { + for i := 0; i < s.batchRead; i++ { data, err := codec.Decode(c) if errors.Is(err, errIncompletePacket) { break @@ -1596,6 +1598,10 @@ func (s *simServer) OnTraffic(c Conn) (action Action) { } else if n == 1 { _, _ = c.Write(packets[0]) } + if len(packets) == s.batchRead && c.InboundBuffered() > 0 { + err := c.Wake(nil) // wake up the connection manually to avoid missing the leftover data + assert.NoError(s.tester, err) + } return } @@ -1603,7 +1609,7 @@ func (s *simServer) OnTick() (delay time.Duration, action Action) { if atomic.CompareAndSwapInt32(&s.started, 0, 1) { for i := 0; i < s.nclients; i++ { go func() { - runSimClient(s.tester, s.network, s.addr, s.packetSize, s.packetBatch) + runSimClient(s.tester, s.network, s.addr, s.packetSize, s.batchWrite) }() } } @@ -1651,11 +1657,14 @@ func (codec testCodec) Encode(buf []byte) ([]byte, error) { return data, nil } -func (codec *testCodec) Decode(c Conn) ([]byte, error) { +func (codec testCodec) Decode(c Conn) ([]byte, error) { bodyOffset := magicNumberSize + bodySize - buf, _ := c.Peek(bodyOffset) - if len(buf) < bodyOffset { - return nil, errIncompletePacket + buf, err := c.Peek(bodyOffset) + if err != nil { + if errors.Is(err, io.ErrShortBuffer) { + err = errIncompletePacket + } + return nil, err } if !bytes.Equal(magicNumberBytes, buf[:magicNumberSize]) { @@ -1664,13 +1673,18 @@ func (codec *testCodec) Decode(c Conn) ([]byte, error) { bodyLen := binary.BigEndian.Uint32(buf[magicNumberSize:bodyOffset]) msgLen := bodyOffset + int(bodyLen) - if c.InboundBuffered() < msgLen { - return nil, errIncompletePacket + buf, err = c.Peek(msgLen) + if err != nil { + if errors.Is(err, io.ErrShortBuffer) { + err = errIncompletePacket + } + return nil, err } - buf, _ = c.Peek(msgLen) + body := make([]byte, bodyLen) + copy(body, buf[bodyOffset:msgLen]) _, _ = c.Discard(msgLen) - return buf[bodyOffset:msgLen], nil + return body, nil } func (codec testCodec) Unpack(buf []byte) ([]byte, error) { @@ -1693,41 +1707,48 @@ func (codec testCodec) Unpack(buf []byte) ([]byte, error) { } func TestSimServer(t *testing.T) { + t.Run("packet-size=64,batch=200", func(t *testing.T) { + runSimServer(t, ":7200", true, 10, 64, 200, -1) + }) t.Run("packet-size=128,batch=100", func(t *testing.T) { - runSimServer(t, ":7200", false, 10, 128, 100) + runSimServer(t, ":7201", false, 10, 128, 100, 10) }) t.Run("packet-size=256,batch=50", func(t *testing.T) { - runSimServer(t, ":7201", true, 10, 256, 50) + runSimServer(t, ":7202", true, 10, 256, 50, -1) }) t.Run("packet-size=512,batch=30", func(t *testing.T) { - runSimServer(t, ":7202", false, 10, 512, 30) + runSimServer(t, ":7203", false, 10, 512, 30, 3) }) t.Run("packet-size=1024,batch=20", func(t *testing.T) { - runSimServer(t, ":7203", true, 10, 1024, 20) + runSimServer(t, ":7204", true, 10, 1024, 20, -1) }) t.Run("packet-size=64*1024,batch=10", func(t *testing.T) { - runSimServer(t, ":7204", false, 10, 64*1024, 10) + runSimServer(t, ":7205", false, 10, 64*1024, 10, 1) }) t.Run("packet-size=128*1024,batch=5", func(t *testing.T) { - runSimServer(t, ":7205", true, 10, 128*1024, 5) + runSimServer(t, ":7206", true, 10, 128*1024, 5, -1) }) t.Run("packet-size=512*1024,batch=3", func(t *testing.T) { - runSimServer(t, ":7206", false, 10, 512*1024, 3) + runSimServer(t, ":7207", false, 10, 512*1024, 3, 1) }) t.Run("packet-size=1024*1024,batch=2", func(t *testing.T) { - runSimServer(t, ":7207", true, 10, 1024*1024, 2) + runSimServer(t, ":7208", true, 10, 1024*1024, 2, -1) }) } -func runSimServer(t *testing.T, addr string, et bool, nclients, packetSize, packetBatch int) { +func runSimServer(t *testing.T, addr string, et bool, nclients, packetSize, batchWrite, batchRead int) { ts := &simServer{ - tester: t, - network: "tcp", - addr: addr, - multicore: true, - nclients: nclients, - packetSize: packetSize, - packetBatch: packetBatch, + tester: t, + network: "tcp", + addr: addr, + multicore: true, + nclients: nclients, + packetSize: packetSize, + batchWrite: batchWrite, + batchRead: batchRead, + } + if batchRead < 0 { + ts.batchRead = math.MaxInt32 // unlimited read batch } err := Run(ts, ts.network+"://"+ts.addr, @@ -1789,6 +1810,7 @@ func batchSendAndRecv(t *testing.T, c net.Conn, rd *bufio.Reader, packetSize, ba for i, req := range requests { rsp, err := codec.Unpack(respPacket[i*packetLen:]) require.NoError(t, err) - require.Equalf(t, req, rsp, "request and response mismatch, packet size: %d, batch: %d", packetSize, batch) + require.Equalf(t, req, rsp, "request and response mismatch, packet size: %d, batch: %d, round: %d", + packetSize, batch, i) } }