Skip to content

Commit

Permalink
opt: optimize some buffers methods
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 committed Apr 17, 2024
1 parent e299975 commit 0bf879b
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 63 deletions.
6 changes: 3 additions & 3 deletions connection_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,15 +414,15 @@ func (c *conn) Next(n int) (buf []byte, err error) {

func (c *conn) Peek(n int) (buf []byte, err error) {
if c.loop.engine.opts.EdgeTriggeredIO {
bs := c.elasticBuffer.Peek(n)
bs, err := c.elasticBuffer.Peek(n)
if len(bs) == 1 {
return bs[0], nil
return bs[0], err
}
c.loop.cache.Reset()
for _, b := range bs {
c.loop.cache.Write(b)
}
return c.loop.cache.Bytes(), nil
return c.loop.cache.Bytes(), err
}

inBufferLen := c.inboundBuffer.Buffered()
Expand Down
16 changes: 10 additions & 6 deletions eventloop_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,16 @@ func (el *eventloop) readLT(c *conn) error {

func (el *eventloop) readET(c *conn) error {
var read int
buf := c.elasticBuffer.AllocNode(MaxStreamBufferCap)
readBlockSize := el.engine.opts.ReadBufferCap
buf := c.elasticBuffer.AllocNode(readBlockSize)
for {
n, err := unix.Read(c.fd, buf[read:])
if err != nil || n == 0 {
if err == unix.EAGAIN {
c.elasticBuffer.Append(buf[:read])
if read == 0 {
c.elasticBuffer.FreeNode(buf)
}
break
}
if n == 0 {
Expand All @@ -158,10 +162,10 @@ func (el *eventloop) readET(c *conn) error {
return el.close(c, os.NewSyscallError("read", err))
}
read += n
if read == MaxStreamBufferCap {
if read == readBlockSize {
c.elasticBuffer.Append(buf)
read = 0
buf = c.elasticBuffer.AllocNode(MaxStreamBufferCap)
read = 0
}
}

Expand All @@ -181,7 +185,7 @@ func (el *eventloop) readET(c *conn) error {
const iovMax = 1024

func (el *eventloop) writeLT(c *conn) error {
iov := c.outboundBuffer.Peek(-1)
iov, _ := c.outboundBuffer.Peek(-1)
var (
n int
err error
Expand Down Expand Up @@ -219,7 +223,7 @@ func (el *eventloop) writeET(c *conn) error {
n int
err error
)
iov := c.outboundBuffer.Peek(-1)
iov, _ := c.outboundBuffer.Peek(-1)
if len(iov) > 1 {
if len(iov) > iovMax {
iov = iov[:iovMax]

Check warning on line 229 in eventloop_unix.go

View check run for this annotation

Codecov / codecov/patch

eventloop_unix.go#L229

Added line #L229 was not covered by tests
Expand Down Expand Up @@ -264,7 +268,7 @@ func (el *eventloop) close(c *conn, err error) (rerr error) {
// Send residual data in buffer back to the peer before actually closing the connection.
if !c.outboundBuffer.IsEmpty() {
for !c.outboundBuffer.IsEmpty() {
iov := c.outboundBuffer.Peek(0)
iov, _ := c.outboundBuffer.Peek(0)

Check warning on line 271 in eventloop_unix.go

View check run for this annotation

Codecov / codecov/patch

eventloop_unix.go#L271

Added line #L271 was not covered by tests
if len(iov) > iovMax {
iov = iov[:iovMax]
}
Expand Down
54 changes: 25 additions & 29 deletions gnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,39 +167,35 @@ func (e Engine) Wake(fd gfd.GFD, cb AsyncCallback) error {

// Reader is an interface that consists of a number of methods for reading that Conn must implement.
//
// Note that the methods in this interface are not goroutine-safe for concurrent use,
// Note that the methods in this interface are not concurrency-safe for concurrent use,
// you must invoke them within any method in EventHandler.
type Reader interface {
io.Reader
io.WriterTo

// Next returns a slice containing the next n bytes from the buffer,
// advancing the buffer as if the bytes had been returned by Read.
// If there are fewer than n bytes in the buffer, Next returns the entire buffer.
// The error is ErrBufferFull if n is larger than b's buffer size.
// Calling this method has the same effect as calling Peek and Discard.
// If the amount of the available bytes is less than requested, a pair of (0, io.ErrShortBuffer)
// is returned.
//
// Note that the []byte buf returned by Next() is not allowed to be passed to a new goroutine,
// as this []byte will be reused within event-loop.
// If you have to use buf in a new goroutine, then you need to make a copy of buf and pass this copy
// to that new goroutine.
Next(n int) (buf []byte, err error)

// Peek returns the next n bytes without advancing the reader. The bytes stop
// being valid at the next read call. If Peek returns fewer than n bytes, it
// also returns an error explaining why the read is short. The error is
// ErrBufferFull if n is larger than b's buffer size.
// Peek returns the next n bytes without advancing the inbound buffer, the returned bytes
// remain valid until a Discard is called. If the amount of the available bytes is
// less than requested, a pair of (0, io.ErrShortBuffer) is returned.
//
// Note that the []byte buf returned by Peek() is not allowed to be passed to a new goroutine,
// as this []byte will be reused within event-loop.
// If you have to use buf in a new goroutine, then you need to make a copy of buf and pass this copy
// to that new goroutine.
Peek(n int) (buf []byte, err error)

// Discard skips the next n bytes, returning the number of bytes discarded.
//
// If Discard skips fewer than n bytes, it also returns an error.
// If 0 <= n <= b.Buffered(), Discard is guaranteed to succeed without
// reading from the underlying io.Reader.
// Discard advances the inbound buffer with next n bytes, returning the number of bytes discarded.
Discard(n int) (discarded int, err error)

// InboundBuffered returns the number of bytes that can be read from the current buffer.
Expand All @@ -208,22 +204,22 @@ type Reader interface {

// Writer is an interface that consists of a number of methods for writing that Conn must implement.
type Writer interface {
io.Writer // not goroutine-safe
io.ReaderFrom // not goroutine-safe
io.Writer // not concurrency-safe
io.ReaderFrom // not concurrency-safe

// Writev writes multiple byte slices to peer synchronously, it's not goroutine-safe,
// Writev writes multiple byte slices to peer synchronously, it's not concurrency-safe,
// you must invoke it within any method in EventHandler.
Writev(bs [][]byte) (n int, err error)

// Flush writes any buffered data to the underlying connection, it's not goroutine-safe,
// Flush writes any buffered data to the underlying connection, it's not concurrency-safe,
// you must invoke it within any method in EventHandler.
Flush() (err error)

// OutboundBuffered returns the number of bytes that can be read from the current buffer.
// it's not goroutine-safe, you must invoke it within any method in EventHandler.
// it's not concurrency-safe, you must invoke it within any method in EventHandler.
OutboundBuffered() (n int)

// AsyncWrite writes bytes to peer asynchronously, it's goroutine-safe,
// AsyncWrite writes bytes to peer asynchronously, it's concurrency-safe,
// you don't have to invoke it within any method in EventHandler,
// usually you would call it in an individual goroutine.
//
Expand All @@ -247,7 +243,7 @@ type AsyncCallback func(c Conn, err error) error

// Socket is a set of functions which manipulate the underlying file descriptor of a connection.
//
// Note that the methods in this interface are goroutine-safe for concurrent use,
// Note that the methods in this interface are concurrency-safe for concurrent use,
// you don't have to invoke them within any method in EventHandler.
type Socket interface {
// Gfd returns the gfd of socket.
Expand Down Expand Up @@ -302,35 +298,35 @@ type Socket interface {

// Conn is an interface of underlying connection.
type Conn interface {
Reader // all methods in Reader are not goroutine-safe.
Writer // some methods in Writer are goroutine-safe, some are not.
Socket // all methods in Socket are goroutine-safe.
Reader // all methods in Reader are not concurrency-safe.
Writer // some methods in Writer are concurrency-safe, some are not.
Socket // all methods in Socket are concurrency-safe.

// Context returns a user-defined context, it's not goroutine-safe,
// Context returns a user-defined context, it's not concurrency-safe,
// you must invoke it within any method in EventHandler.
Context() (ctx interface{})

// SetContext sets a user-defined context, it's not goroutine-safe,
// SetContext sets a user-defined context, it's not concurrency-safe,
// you must invoke it within any method in EventHandler.
SetContext(ctx interface{})

// LocalAddr is the connection's local socket address, it's not goroutine-safe,
// LocalAddr is the connection's local socket address, it's not concurrency-safe,
// you must invoke it within any method in EventHandler.
LocalAddr() (addr net.Addr)

// RemoteAddr is the connection's remote peer address, it's not goroutine-safe,
// RemoteAddr is the connection's remote peer address, it's not concurrency-safe,
// you must invoke it within any method in EventHandler.
RemoteAddr() (addr net.Addr)

// Wake triggers a OnTraffic event for the current connection, it's goroutine-safe.
// Wake triggers a OnTraffic event for the current connection, it's concurrency-safe.
Wake(callback AsyncCallback) (err error)

// CloseWithCallback closes the current connection, it's goroutine-safe.
// CloseWithCallback closes the current connection, it's concurrency-safe.
// Usually you should provide a non-nil callback for this method,
// otherwise your better choice is Close().
CloseWithCallback(callback AsyncCallback) (err error)

// Close closes the current connection, implements net.Conn, it's goroutine-safe.
// Close closes the current connection, implements net.Conn, it's concurrency-safe.
Close() (err error)

// SetDeadline implements net.Conn.
Expand Down
15 changes: 10 additions & 5 deletions pkg/buffer/elastic/elastic_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,24 @@ func TestMixedBuffer_Basic(t *testing.T) {
require.EqualValues(t, newDataLen, mb.Buffered())
require.EqualValues(t, rbn, mb.ringBuffer.Buffered())

bs := mb.Peek(-1)
bs, err := mb.Peek(-1)
require.NoError(t, err)
var p []byte
for _, b := range bs {
p = append(p, b...)
}
require.EqualValues(t, data, p)

bs = mb.Peek(rbn)
bs, err = mb.Peek(rbn)
require.NoError(t, err)
p = bs[0]
require.EqualValues(t, data[:rbn], p)
n, err = mb.Discard(rbn)
require.NoError(t, err)
require.EqualValues(t, rbn, n)
require.NotNil(t, mb.ringBuffer)
bs = mb.Peek(newDataLen - rbn)
bs, err = mb.Peek(newDataLen - rbn)
require.NoError(t, err)
p = bs[0]
require.EqualValues(t, data[rbn:], p)
n, err = mb.Discard(newDataLen - rbn)
Expand Down Expand Up @@ -82,7 +85,8 @@ func TestMixedBuffer_Basic(t *testing.T) {
require.NoError(t, err)
require.EqualValues(t, cum-headCum, n)
require.EqualValues(t, cum, mb.Buffered())
bs = mb.Peek(-1)
bs, err = mb.Peek(-1)
require.NoError(t, err)
p = p[:0]
for _, b := range bs {
p = append(p, b...)
Expand Down Expand Up @@ -125,7 +129,8 @@ func TestMixedBuffer_ReadFrom(t *testing.T) {
require.NoError(t, err)
require.EqualValues(t, dataLen, m)
require.EqualValues(t, data, buf)
bs := mb.Peek(dataLen)
bs, err := mb.Peek(dataLen)
require.NoError(t, err)
var p []byte
for _, b := range bs {
p = append(p, b...)
Expand Down
8 changes: 5 additions & 3 deletions pkg/buffer/elastic/elastic_ring_list_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,15 @@ func (mb *Buffer) Read(p []byte) (n int, err error) {
}

// Peek returns n bytes as [][]byte, these bytes won't be discarded until Buffer.Discard() is called.
func (mb *Buffer) Peek(n int) [][]byte {
if n <= 0 {
func (mb *Buffer) Peek(n int) ([][]byte, error) {
if n <= 0 || n == math.MaxInt32 {
n = math.MaxInt32
} else if n > mb.Buffered() {
return nil, io.ErrShortBuffer
}
head, tail := mb.ringBuffer.Peek(n)
if mb.ringBuffer.Buffered() >= n {
return [][]byte{head, tail}
return [][]byte{head, tail}, nil
}
return mb.listBuffer.PeekWithBytes(n, head, tail)
}
Expand Down
45 changes: 32 additions & 13 deletions pkg/buffer/linkedlist/linked_list_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ func (llb *Buffer) Read(p []byte) (n int, err error) {
return
}
}
if n == 0 {
err = io.EOF
}
return
}

Expand Down Expand Up @@ -114,43 +117,59 @@ func (llb *Buffer) PushBack(p []byte) {

// Peek assembles the up to maxBytes of [][]byte based on the list of node,
// it won't remove these nodes from l until Discard() is called.
func (llb *Buffer) Peek(maxBytes int) [][]byte {
if maxBytes <= 0 {
func (llb *Buffer) Peek(maxBytes int) ([][]byte, error) {
if maxBytes <= 0 || maxBytes == math.MaxInt32 {
maxBytes = math.MaxInt32
} else if maxBytes > llb.Buffered() {
return nil, io.ErrShortBuffer
}
llb.bs = llb.bs[:0]
var cum int
for iter := llb.head; iter != nil; iter = iter.next {
llb.bs = append(llb.bs, iter.buf)
if cum += iter.len(); cum >= maxBytes {
offset := iter.len()
if cum+offset > maxBytes {
offset = maxBytes - cum
}
llb.bs = append(llb.bs, iter.buf[:offset])
if cum += offset; cum == maxBytes {
break
}
}
return llb.bs
return llb.bs, nil
}

// PeekWithBytes is like Peek but accepts [][]byte and puts them onto head.
func (llb *Buffer) PeekWithBytes(maxBytes int, bs ...[]byte) [][]byte {
if maxBytes <= 0 {
func (llb *Buffer) PeekWithBytes(maxBytes int, bs ...[]byte) ([][]byte, error) {
if maxBytes <= 0 || maxBytes == math.MaxInt32 {
maxBytes = math.MaxInt32
} else if maxBytes > llb.Buffered() {
return nil, io.ErrShortBuffer
}
llb.bs = llb.bs[:0]
var cum int
for _, b := range bs {
if n := len(b); n > 0 {
llb.bs = append(llb.bs, b)
if cum += n; cum >= maxBytes {
return llb.bs
offset := n
if cum+offset > maxBytes {
offset = maxBytes - cum
}
llb.bs = append(llb.bs, b[:offset])
if cum += offset; cum == maxBytes {
return llb.bs, nil
}
}
}
for iter := llb.head; iter != nil; iter = iter.next {
llb.bs = append(llb.bs, iter.buf)
if cum += iter.len(); cum >= maxBytes {
offset := iter.len()
if cum+offset > maxBytes {
offset = maxBytes - cum
}
llb.bs = append(llb.bs, iter.buf[:offset])
if cum += offset; cum == maxBytes {
break
}
}
return llb.bs
return llb.bs, nil
}

// Discard removes some nodes based on n bytes.
Expand Down
10 changes: 6 additions & 4 deletions pkg/buffer/linkedlist/llbuffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,27 @@ func TestLinkedListBuffer_Basic(t *testing.T) {
require.EqualValues(t, maxBlocks, llb.Len())
require.EqualValues(t, cum, llb.Buffered())

bs := llb.Peek(cum / 4)
bs, err := llb.Peek(cum / 4)
require.NoError(t, err)
var p []byte
for _, b := range bs {
p = append(p, b...)
}
pn := len(p)
require.GreaterOrEqual(t, pn, cum/4)
require.EqualValues(t, pn, cum/4)
require.EqualValues(t, buf.Bytes()[:pn], p)
tmpA := make([]byte, cum/16)
tmpB := make([]byte, cum/16)
rand.Read(tmpA)
rand.Read(tmpB)
bs = llb.PeekWithBytes(cum/4, tmpA, tmpB)
bs, err = llb.PeekWithBytes(cum/4, tmpA, tmpB)
require.NoError(t, err)
p = p[:0]
for _, b := range bs {
p = append(p, b...)
}
pn = len(p)
require.GreaterOrEqual(t, pn, cum/4)
require.EqualValues(t, pn, cum/4)
var tmpBuf bytes.Buffer
tmpBuf.Write(tmpA)
tmpBuf.Write(tmpB)
Expand Down

0 comments on commit 0bf879b

Please sign in to comment.