Skip to content

Commit

Permalink
Merge pull request #658 from panjf2000/dev
Browse files Browse the repository at this point in the history
patch: v2.6.1
  • Loading branch information
panjf2000 authored Nov 13, 2024
2 parents 2b0357c + db2fad2 commit c5090c2
Show file tree
Hide file tree
Showing 18 changed files with 249 additions and 207 deletions.
4 changes: 2 additions & 2 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,13 +591,13 @@ func (ev *clientEventsForWake) OnTraffic(c Conn) (action Action) {
assert.Nilf(ev.tester, buf, "expected: %v, but got: %v", nil, buf)
assert.ErrorIsf(ev.tester, err, io.ErrShortBuffer, "expected error: %v, but got: %v", io.ErrShortBuffer, err)
buf, err = c.Next(-1)
assert.Nilf(ev.tester, buf, "expected: %v, but got: %v", nil, buf)
assert.Emptyf(ev.tester, buf, "expected an empty slice, but got: %v", buf)
assert.NoErrorf(ev.tester, err, "expected: %v, but got: %v", nil, err)
buf, err = c.Peek(10)
assert.Nilf(ev.tester, buf, "expected: %v, but got: %v", nil, buf)
assert.ErrorIsf(ev.tester, err, io.ErrShortBuffer, "expected error: %v, but got: %v", io.ErrShortBuffer, err)
buf, err = c.Peek(-1)
assert.Nilf(ev.tester, buf, "expected: %v, but got: %v", nil, buf)
assert.Emptyf(ev.tester, buf, "expected an empty slice, but got: %v", buf)
assert.NoErrorf(ev.tester, err, "expected: %v, but got: %v", nil, err)
n, err = c.Discard(10)
assert.Zerof(ev.tester, n, "expected: %v, but got: %v", 0, n)
Expand Down
6 changes: 3 additions & 3 deletions client_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (cli *Client) Start() error {

// Stop stops the client event-loop.
func (cli *Client) Stop() (err error) {
logging.Error(cli.el.poller.Trigger(queue.HighPriority, func(_ interface{}) error { return errorx.ErrEngineShutdown }, nil))
logging.Error(cli.el.poller.Trigger(queue.HighPriority, func(_ any) error { return errorx.ErrEngineShutdown }, nil))
// Stop the ticker.
if cli.opts.Ticker {
cli.el.engine.ticker.cancel()
Expand All @@ -153,7 +153,7 @@ func (cli *Client) Dial(network, address string) (Conn, error) {
}

// DialContext is like Dial but also accepts an empty interface ctx that can be obtained later via Conn.Context.
func (cli *Client) DialContext(network, address string, ctx interface{}) (Conn, error) {
func (cli *Client) DialContext(network, address string, ctx any) (Conn, error) {
c, err := net.Dial(network, address)
if err != nil {
return nil, err
Expand All @@ -167,7 +167,7 @@ func (cli *Client) Enroll(c net.Conn) (Conn, error) {
}

// EnrollContext is like Enroll but also accepts an empty interface ctx that can be obtained later via Conn.Context.
func (cli *Client) EnrollContext(c net.Conn, ctx interface{}) (Conn, error) {
func (cli *Client) EnrollContext(c net.Conn, ctx any) (Conn, error) {
defer c.Close()

sc, ok := c.(syscall.Conn)
Expand Down
6 changes: 3 additions & 3 deletions client_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) {
eventHandler: eh,
}
cli.el = &eventloop{
ch: make(chan interface{}, 1024),
ch: make(chan any, 1024),
eng: eng,
connections: make(map[*conn]struct{}),
eventHandler: eh,
Expand Down Expand Up @@ -121,7 +121,7 @@ func (cli *Client) Dial(network, addr string) (Conn, error) {
return cli.DialContext(network, addr, nil)
}

func (cli *Client) DialContext(network, addr string, ctx interface{}) (Conn, error) {
func (cli *Client) DialContext(network, addr string, ctx any) (Conn, error) {
var (
c net.Conn
err error
Expand All @@ -146,7 +146,7 @@ func (cli *Client) Enroll(nc net.Conn) (gc Conn, err error) {
return cli.EnrollContext(nc, nil)
}

func (cli *Client) EnrollContext(nc net.Conn, ctx interface{}) (gc Conn, err error) {
func (cli *Client) EnrollContext(nc net.Conn, ctx any) (gc Conn, err error) {
connOpened := make(chan struct{})
switch v := nc.(type) {
case *net.TCPConn:
Expand Down
34 changes: 19 additions & 15 deletions connection_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import (
type conn struct {
fd int // file descriptor
gfd gfd.GFD // gnet file descriptor
ctx interface{} // user-defined context
ctx any // user-defined context
remote unix.Sockaddr // remote socket address
localAddr net.Addr // local addr
remoteAddr net.Addr // remote addr
Expand Down Expand Up @@ -243,8 +243,8 @@ type asyncWriteHook struct {
data []byte
}

func (c *conn) asyncWrite(itf interface{}) (err error) {
hook := itf.(*asyncWriteHook)
func (c *conn) asyncWrite(a any) (err error) {
hook := a.(*asyncWriteHook)
defer func() {
if hook.callback != nil {
_ = hook.callback(c, err)
Expand All @@ -264,8 +264,8 @@ type asyncWritevHook struct {
data [][]byte
}

func (c *conn) asyncWritev(itf interface{}) (err error) {
hook := itf.(*asyncWritevHook)
func (c *conn) asyncWritev(a any) (err error) {
hook := a.(*asyncWritevHook)
defer func() {
if hook.callback != nil {
_ = hook.callback(c, err)
Expand Down Expand Up @@ -318,16 +318,18 @@ func (c *conn) Next(n int) (buf []byte, err error) {
} else if n <= 0 {
n = totalLen
}

if c.inboundBuffer.IsEmpty() {
buf = c.buffer[:n]
c.buffer = c.buffer[n:]
return
}

head, tail := c.inboundBuffer.Peek(n)
defer c.inboundBuffer.Discard(n) //nolint:errcheck
c.loop.cache.Reset()
c.loop.cache.Write(head)
if len(head) >= n {
if len(head) == n {
return c.loop.cache.Bytes(), err
}
c.loop.cache.Write(tail)
Expand All @@ -348,12 +350,14 @@ func (c *conn) Peek(n int) (buf []byte, err error) {
} else if n <= 0 {
n = totalLen
}

if c.inboundBuffer.IsEmpty() {
return c.buffer[:n], err
}

head, tail := c.inboundBuffer.Peek(n)
if len(head) >= n {
return head[:n], err
if len(head) == n {
return head, err
}
c.loop.cache.Reset()
c.loop.cache.Write(head)
Expand Down Expand Up @@ -435,10 +439,10 @@ func (c *conn) OutboundBuffered() int {
return c.outboundBuffer.Buffered()
}

func (c *conn) Context() interface{} { return c.ctx }
func (c *conn) SetContext(ctx interface{}) { c.ctx = ctx }
func (c *conn) LocalAddr() net.Addr { return c.localAddr }
func (c *conn) RemoteAddr() net.Addr { return c.remoteAddr }
func (c *conn) Context() any { return c.ctx }
func (c *conn) SetContext(ctx any) { c.ctx = ctx }
func (c *conn) LocalAddr() net.Addr { return c.localAddr }
func (c *conn) RemoteAddr() net.Addr { return c.remoteAddr }

// Implementation of Socket interface

Expand Down Expand Up @@ -485,7 +489,7 @@ func (c *conn) AsyncWritev(bs [][]byte, callback AsyncCallback) error {
}

func (c *conn) Wake(callback AsyncCallback) error {
return c.loop.poller.Trigger(queue.LowPriority, func(_ interface{}) (err error) {
return c.loop.poller.Trigger(queue.LowPriority, func(_ any) (err error) {
err = c.loop.wake(c)
if callback != nil {
_ = callback(c, err)
Expand All @@ -495,7 +499,7 @@ func (c *conn) Wake(callback AsyncCallback) error {
}

func (c *conn) CloseWithCallback(callback AsyncCallback) error {
return c.loop.poller.Trigger(queue.LowPriority, func(_ interface{}) (err error) {
return c.loop.poller.Trigger(queue.LowPriority, func(_ any) (err error) {
err = c.loop.close(c, nil)
if callback != nil {
_ = callback(c, err)
Expand All @@ -505,7 +509,7 @@ func (c *conn) CloseWithCallback(callback AsyncCallback) error {
}

func (c *conn) Close() error {
return c.loop.poller.Trigger(queue.LowPriority, func(_ interface{}) (err error) {
return c.loop.poller.Trigger(queue.LowPriority, func(_ any) (err error) {
err = c.loop.close(c, nil)
return
}, nil)
Expand Down
Loading

0 comments on commit c5090c2

Please sign in to comment.