Skip to content

Commit

Permalink
chore: rename some parameters
Browse files Browse the repository at this point in the history
interface{} --> any
itf/i --> a
queue.TaskFunc --> queue.Func
queue.Run --> queue.Exec
arg --> parameter

Also, update some comments to make more sense.
  • Loading branch information
panjf2000 committed Nov 13, 2024
1 parent a0c9787 commit 47ab6a3
Show file tree
Hide file tree
Showing 16 changed files with 83 additions and 81 deletions.
6 changes: 3 additions & 3 deletions client_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (cli *Client) Start() error {

// Stop stops the client event-loop.
func (cli *Client) Stop() (err error) {
logging.Error(cli.el.poller.Trigger(queue.HighPriority, func(_ interface{}) error { return errorx.ErrEngineShutdown }, nil))
logging.Error(cli.el.poller.Trigger(queue.HighPriority, func(_ any) error { return errorx.ErrEngineShutdown }, nil))
// Stop the ticker.
if cli.opts.Ticker {
cli.el.engine.ticker.cancel()
Expand All @@ -153,7 +153,7 @@ func (cli *Client) Dial(network, address string) (Conn, error) {
}

// DialContext is like Dial but also accepts an empty interface ctx that can be obtained later via Conn.Context.
func (cli *Client) DialContext(network, address string, ctx interface{}) (Conn, error) {
func (cli *Client) DialContext(network, address string, ctx any) (Conn, error) {
c, err := net.Dial(network, address)
if err != nil {
return nil, err
Expand All @@ -167,7 +167,7 @@ func (cli *Client) Enroll(c net.Conn) (Conn, error) {
}

// EnrollContext is like Enroll but also accepts an empty interface ctx that can be obtained later via Conn.Context.
func (cli *Client) EnrollContext(c net.Conn, ctx interface{}) (Conn, error) {
func (cli *Client) EnrollContext(c net.Conn, ctx any) (Conn, error) {
defer c.Close()

sc, ok := c.(syscall.Conn)
Expand Down
6 changes: 3 additions & 3 deletions client_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) {
eventHandler: eh,
}
cli.el = &eventloop{
ch: make(chan interface{}, 1024),
ch: make(chan any, 1024),
eng: eng,
connections: make(map[*conn]struct{}),
eventHandler: eh,
Expand Down Expand Up @@ -121,7 +121,7 @@ func (cli *Client) Dial(network, addr string) (Conn, error) {
return cli.DialContext(network, addr, nil)
}

func (cli *Client) DialContext(network, addr string, ctx interface{}) (Conn, error) {
func (cli *Client) DialContext(network, addr string, ctx any) (Conn, error) {
var (
c net.Conn
err error
Expand All @@ -146,7 +146,7 @@ func (cli *Client) Enroll(nc net.Conn) (gc Conn, err error) {
return cli.EnrollContext(nc, nil)
}

func (cli *Client) EnrollContext(nc net.Conn, ctx interface{}) (gc Conn, err error) {
func (cli *Client) EnrollContext(nc net.Conn, ctx any) (gc Conn, err error) {
connOpened := make(chan struct{})
switch v := nc.(type) {
case *net.TCPConn:
Expand Down
24 changes: 12 additions & 12 deletions connection_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import (
type conn struct {
fd int // file descriptor
gfd gfd.GFD // gnet file descriptor
ctx interface{} // user-defined context
ctx any // user-defined context
remote unix.Sockaddr // remote socket address
localAddr net.Addr // local addr
remoteAddr net.Addr // remote addr
Expand Down Expand Up @@ -243,8 +243,8 @@ type asyncWriteHook struct {
data []byte
}

func (c *conn) asyncWrite(itf interface{}) (err error) {
hook := itf.(*asyncWriteHook)
func (c *conn) asyncWrite(a any) (err error) {
hook := a.(*asyncWriteHook)
defer func() {
if hook.callback != nil {
_ = hook.callback(c, err)
Expand All @@ -264,8 +264,8 @@ type asyncWritevHook struct {
data [][]byte
}

func (c *conn) asyncWritev(itf interface{}) (err error) {
hook := itf.(*asyncWritevHook)
func (c *conn) asyncWritev(a any) (err error) {
hook := a.(*asyncWritevHook)
defer func() {
if hook.callback != nil {
_ = hook.callback(c, err)
Expand Down Expand Up @@ -439,10 +439,10 @@ func (c *conn) OutboundBuffered() int {
return c.outboundBuffer.Buffered()
}

func (c *conn) Context() interface{} { return c.ctx }
func (c *conn) SetContext(ctx interface{}) { c.ctx = ctx }
func (c *conn) LocalAddr() net.Addr { return c.localAddr }
func (c *conn) RemoteAddr() net.Addr { return c.remoteAddr }
func (c *conn) Context() any { return c.ctx }
func (c *conn) SetContext(ctx any) { c.ctx = ctx }
func (c *conn) LocalAddr() net.Addr { return c.localAddr }
func (c *conn) RemoteAddr() net.Addr { return c.remoteAddr }

// Implementation of Socket interface

Expand Down Expand Up @@ -489,7 +489,7 @@ func (c *conn) AsyncWritev(bs [][]byte, callback AsyncCallback) error {
}

func (c *conn) Wake(callback AsyncCallback) error {
return c.loop.poller.Trigger(queue.LowPriority, func(_ interface{}) (err error) {
return c.loop.poller.Trigger(queue.LowPriority, func(_ any) (err error) {
err = c.loop.wake(c)
if callback != nil {
_ = callback(c, err)
Expand All @@ -499,7 +499,7 @@ func (c *conn) Wake(callback AsyncCallback) error {
}

func (c *conn) CloseWithCallback(callback AsyncCallback) error {
return c.loop.poller.Trigger(queue.LowPriority, func(_ interface{}) (err error) {
return c.loop.poller.Trigger(queue.LowPriority, func(_ any) (err error) {
err = c.loop.close(c, nil)
if callback != nil {
_ = callback(c, err)
Expand All @@ -509,7 +509,7 @@ func (c *conn) CloseWithCallback(callback AsyncCallback) error {
}

func (c *conn) Close() error {
return c.loop.poller.Trigger(queue.LowPriority, func(_ interface{}) (err error) {
return c.loop.poller.Trigger(queue.LowPriority, func(_ any) (err error) {
err = c.loop.close(c, nil)
return
}, nil)
Expand Down
10 changes: 5 additions & 5 deletions connection_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type openConn struct {

type conn struct {
pc net.PacketConn
ctx interface{} // user-defined context
ctx any // user-defined context
loop *eventloop // owner event-loop
buffer *bbPool.ByteBuffer // reuse memory of inbound data as a temporary buffer
rawConn net.Conn // original connection
Expand Down Expand Up @@ -271,10 +271,10 @@ func (c *conn) OutboundBuffered() int {
return 0
}

func (c *conn) Context() interface{} { return c.ctx }
func (c *conn) SetContext(ctx interface{}) { c.ctx = ctx }
func (c *conn) LocalAddr() net.Addr { return c.localAddr }
func (c *conn) RemoteAddr() net.Addr { return c.remoteAddr }
func (c *conn) Context() any { return c.ctx }
func (c *conn) SetContext(ctx any) { c.ctx = ctx }
func (c *conn) LocalAddr() net.Addr { return c.localAddr }
func (c *conn) RemoteAddr() net.Addr { return c.remoteAddr }

func (c *conn) Fd() (fd int) {
if c.rawConn == nil {
Expand Down
4 changes: 2 additions & 2 deletions engine_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,14 +212,14 @@ func (eng *engine) stop(s Engine) {

// Notify all event-loops to exit.
eng.eventLoops.iterate(func(i int, el *eventloop) bool {
err := el.poller.Trigger(queue.HighPriority, func(_ interface{}) error { return errorx.ErrEngineShutdown }, nil)
err := el.poller.Trigger(queue.HighPriority, func(_ any) error { return errorx.ErrEngineShutdown }, nil)
if err != nil {
eng.opts.Logger.Errorf("failed to enqueue shutdown signal of high-priority for event-loop(%d): %v", i, err)
}
return true
})
if eng.ingress != nil {
err := eng.ingress.poller.Trigger(queue.HighPriority, func(_ interface{}) error { return errorx.ErrEngineShutdown }, nil)
err := eng.ingress.poller.Trigger(queue.HighPriority, func(_ any) error { return errorx.ErrEngineShutdown }, nil)
if err != nil {
eng.opts.Logger.Errorf("failed to enqueue shutdown signal of high-priority for main event-loop: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion engine_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (eng *engine) closeEventLoops() {
func (eng *engine) start(numEventLoop int) error {
for i := 0; i < numEventLoop; i++ {
el := eventloop{
ch: make(chan interface{}, 1024),
ch: make(chan any, 1024),
idx: i,
eng: eng,
connections: make(map[*conn]struct{}),
Expand Down
24 changes: 12 additions & 12 deletions eventloop_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ type connWithCallback struct {
cb func()
}

func (el *eventloop) register(itf interface{}) error {
c, ok := itf.(*conn)
func (el *eventloop) register(a any) error {
c, ok := a.(*conn)
if !ok {
ccb := itf.(*connWithCallback)
ccb := a.(*connWithCallback)
c = ccb.c
defer ccb.cb()
}
Expand Down Expand Up @@ -114,8 +114,8 @@ func (el *eventloop) open(c *conn) error {
return el.handleAction(c, action)
}

func (el *eventloop) read0(itf interface{}) error {
return el.read(itf.(*conn))
func (el *eventloop) read0(a any) error {
return el.read(a.(*conn))
}

func (el *eventloop) read(c *conn) error {
Expand Down Expand Up @@ -166,8 +166,8 @@ loop:
return nil
}

func (el *eventloop) write0(itf interface{}) error {
return el.write(itf.(*conn))
func (el *eventloop) write0(a any) error {
return el.write(a.(*conn))
}

// The default value of UIO_MAXIOV/IOV_MAX is 1024 on Linux and most BSD-like OSs.
Expand Down Expand Up @@ -297,7 +297,7 @@ func (el *eventloop) ticker(ctx context.Context) {
case Shutdown:
// It seems reasonable to mark this as low-priority, waiting for some tasks like asynchronous writes
// to finish up before shutting down the service.
err := el.poller.Trigger(queue.LowPriority, func(_ interface{}) error { return errorx.ErrEngineShutdown }, nil)
err := el.poller.Trigger(queue.LowPriority, func(_ any) error { return errorx.ErrEngineShutdown }, nil)
el.getLogger().Debugf("failed to enqueue shutdown signal of high-priority for event-loop(%d): %v", el.idx, err)
}
if timer == nil {
Expand Down Expand Up @@ -354,8 +354,8 @@ func (el *eventloop) handleAction(c *conn, action Action) error {
}

/*
func (el *eventloop) execCmd(itf interface{}) (err error) {
cmd := itf.(*asyncCmd)
func (el *eventloop) execCmd(a any) (err error) {
cmd := a.(*asyncCmd)
c := el.connections.getConnByGFD(cmd.fd)
if c == nil || c.gfd != cmd.fd {
return errorx.ErrInvalidConn
Expand All @@ -373,9 +373,9 @@ func (el *eventloop) execCmd(itf interface{}) (err error) {
case asyncCmdWake:
return el.wake(c)
case asyncCmdWrite:
_, err = c.Write(cmd.arg.([]byte))
_, err = c.Write(cmd.param.([]byte))
case asyncCmdWritev:
_, err = c.Writev(cmd.arg.([][]byte))
_, err = c.Writev(cmd.param.([][]byte))
default:
return errorx.ErrUnsupportedOp
}
Expand Down
2 changes: 1 addition & 1 deletion eventloop_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
)

type eventloop struct {
ch chan interface{} // channel for event-loop
ch chan any // channel for event-loop
idx int // index of event-loop in event-loops
eng *engine // engine in loop
cache bytes.Buffer // temporary buffer for scattered bytes
Expand Down
16 changes: 9 additions & 7 deletions gnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ type asyncCmd struct {
fd gfd.GFD
typ asyncCmdType
cb AsyncCallback
arg interface{}
param any
}
// AsyncWrite writes data to the given connection asynchronously.
Expand All @@ -135,7 +135,7 @@ func (e Engine) AsyncWrite(fd gfd.GFD, p []byte, cb AsyncCallback) error {
return err
}
return e.eng.sendCmd(&asyncCmd{fd: fd, typ: asyncCmdWrite, cb: cb, arg: p}, false)
return e.eng.sendCmd(&asyncCmd{fd: fd, typ: asyncCmdWrite, cb: cb, param: p}, false)
}
// AsyncWritev is like AsyncWrite, but it accepts a slice of byte slices.
Expand All @@ -144,7 +144,7 @@ func (e Engine) AsyncWritev(fd gfd.GFD, batch [][]byte, cb AsyncCallback) error
return err
}
return e.eng.sendCmd(&asyncCmd{fd: fd, typ: asyncCmdWritev, cb: cb, arg: batch}, false)
return e.eng.sendCmd(&asyncCmd{fd: fd, typ: asyncCmdWritev, cb: cb, param: batch}, false)
}
// Close closes the given connection.
Expand Down Expand Up @@ -237,9 +237,11 @@ type Writer interface {
AsyncWritev(bs [][]byte, callback AsyncCallback) (err error)
}

// AsyncCallback is a callback which will be invoked after the asynchronous functions has finished executing.
// AsyncCallback is a callback that will be invoked after the asynchronous function finishes.
//
// Note that the parameter gnet.Conn is already released under UDP protocol, thus it's not allowed to be accessed.
// Note that the parameter gnet.Conn might have been already released when it's UDP protocol,
// thus it shouldn't be accessed.
// This callback must not block, otherwise, it blocks the event-loop.
type AsyncCallback func(c Conn, err error) error

// Socket is a set of functions which manipulate the underlying file descriptor of a connection.
Expand Down Expand Up @@ -303,11 +305,11 @@ type Conn interface {

// Context returns a user-defined context, it's not concurrency-safe,
// you must invoke it within any method in EventHandler.
Context() (ctx interface{})
Context() (ctx any)

// SetContext sets a user-defined context, it's not concurrency-safe,
// you must invoke it within any method in EventHandler.
SetContext(ctx interface{})
SetContext(ctx any)

// LocalAddr is the connection's local socket address, it's not concurrency-safe,
// you must invoke it within any method in EventHandler.
Expand Down
8 changes: 4 additions & 4 deletions internal/netpoll/poller_epoll_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ var (
// any asks other than high-priority tasks will be shunted to asyncTaskQueue.
//
// Note that asyncTaskQueue is a queue of low-priority whose size may grow large and tasks in it may backlog.
func (p *Poller) Trigger(priority queue.EventPriority, fn queue.TaskFunc, arg interface{}) (err error) {
func (p *Poller) Trigger(priority queue.EventPriority, fn queue.Func, param any) (err error) {
task := queue.GetTask()
task.Run, task.Arg = fn, arg
task.Exec, task.Param = fn, param
if priority > queue.HighPriority && p.urgentAsyncTaskQueue.Length() >= p.highPriorityEventsThreshold {
p.asyncTaskQueue.Enqueue(task)
} else {
Expand Down Expand Up @@ -145,7 +145,7 @@ func (p *Poller) Polling(callback PollEventHandler) error {
doChores = false
task := p.urgentAsyncTaskQueue.Dequeue()
for ; task != nil; task = p.urgentAsyncTaskQueue.Dequeue() {
err = task.Run(task.Arg)
err = task.Exec(task.Param)
if errors.Is(err, errorx.ErrEngineShutdown) {
return err
}
Expand All @@ -155,7 +155,7 @@ func (p *Poller) Polling(callback PollEventHandler) error {
if task = p.asyncTaskQueue.Dequeue(); task == nil {
break
}
err = task.Run(task.Arg)
err = task.Exec(task.Param)
if errors.Is(err, errorx.ErrEngineShutdown) {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions internal/netpoll/poller_epoll_ultimate.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ var (
// any asks other than high-priority tasks will be shunted to asyncTaskQueue.
//
// Note that asyncTaskQueue is a queue of low-priority whose size may grow large and tasks in it may backlog.
func (p *Poller) Trigger(priority queue.EventPriority, fn queue.TaskFunc, arg interface{}) (err error) {
func (p *Poller) Trigger(priority queue.EventPriority, fn queue.Func, param any) (err error) {
task := queue.GetTask()
task.Run, task.Arg = fn, arg
task.Exec, task.Param = fn, param
if priority > queue.HighPriority && p.urgentAsyncTaskQueue.Length() >= p.highPriorityEventsThreshold {
p.asyncTaskQueue.Enqueue(task)
} else {
Expand Down Expand Up @@ -147,7 +147,7 @@ func (p *Poller) Polling() error {
doChores = false
task := p.urgentAsyncTaskQueue.Dequeue()
for ; task != nil; task = p.urgentAsyncTaskQueue.Dequeue() {
err = task.Run(task.Arg)
err = task.Exec(task.Param)
if errors.Is(err, errorx.ErrEngineShutdown) {
return err
}
Expand All @@ -157,7 +157,7 @@ func (p *Poller) Polling() error {
if task = p.asyncTaskQueue.Dequeue(); task == nil {
break
}
err = task.Run(task.Arg)
err = task.Exec(task.Param)
if errors.Is(err, errorx.ErrEngineShutdown) {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions internal/netpoll/poller_kqueue_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ func (p *Poller) Close() error {
// any asks other than high-priority tasks will be shunted to asyncTaskQueue.
//
// Note that asyncTaskQueue is a queue of low-priority whose size may grow large and tasks in it may backlog.
func (p *Poller) Trigger(priority queue.EventPriority, fn queue.TaskFunc, arg interface{}) (err error) {
func (p *Poller) Trigger(priority queue.EventPriority, fn queue.Func, param any) (err error) {
task := queue.GetTask()
task.Run, task.Arg = fn, arg
task.Exec, task.Param = fn, param
if priority > queue.HighPriority && p.urgentAsyncTaskQueue.Length() >= p.highPriorityEventsThreshold {
p.asyncTaskQueue.Enqueue(task)
} else {
Expand Down Expand Up @@ -130,7 +130,7 @@ func (p *Poller) Polling(callback PollEventHandler) error {
doChores = false
task := p.urgentAsyncTaskQueue.Dequeue()
for ; task != nil; task = p.urgentAsyncTaskQueue.Dequeue() {
err = task.Run(task.Arg)
err = task.Exec(task.Param)
if errors.Is(err, errorx.ErrEngineShutdown) {
return err
}
Expand All @@ -140,7 +140,7 @@ func (p *Poller) Polling(callback PollEventHandler) error {
if task = p.asyncTaskQueue.Dequeue(); task == nil {
break
}
err = task.Run(task.Arg)
err = task.Exec(task.Param)
if errors.Is(err, errorx.ErrEngineShutdown) {
return err
}
Expand Down
Loading

0 comments on commit 47ab6a3

Please sign in to comment.