Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: rename some parameters #656

Merged
merged 1 commit into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions client_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (cli *Client) Start() error {

// Stop stops the client event-loop.
func (cli *Client) Stop() (err error) {
logging.Error(cli.el.poller.Trigger(queue.HighPriority, func(_ interface{}) error { return errorx.ErrEngineShutdown }, nil))
logging.Error(cli.el.poller.Trigger(queue.HighPriority, func(_ any) error { return errorx.ErrEngineShutdown }, nil))
// Stop the ticker.
if cli.opts.Ticker {
cli.el.engine.ticker.cancel()
Expand All @@ -153,7 +153,7 @@ func (cli *Client) Dial(network, address string) (Conn, error) {
}

// DialContext is like Dial but also accepts an empty interface ctx that can be obtained later via Conn.Context.
func (cli *Client) DialContext(network, address string, ctx interface{}) (Conn, error) {
func (cli *Client) DialContext(network, address string, ctx any) (Conn, error) {
c, err := net.Dial(network, address)
if err != nil {
return nil, err
Expand All @@ -167,7 +167,7 @@ func (cli *Client) Enroll(c net.Conn) (Conn, error) {
}

// EnrollContext is like Enroll but also accepts an empty interface ctx that can be obtained later via Conn.Context.
func (cli *Client) EnrollContext(c net.Conn, ctx interface{}) (Conn, error) {
func (cli *Client) EnrollContext(c net.Conn, ctx any) (Conn, error) {
defer c.Close()

sc, ok := c.(syscall.Conn)
Expand Down
6 changes: 3 additions & 3 deletions client_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) {
eventHandler: eh,
}
cli.el = &eventloop{
ch: make(chan interface{}, 1024),
ch: make(chan any, 1024),
eng: eng,
connections: make(map[*conn]struct{}),
eventHandler: eh,
Expand Down Expand Up @@ -121,7 +121,7 @@ func (cli *Client) Dial(network, addr string) (Conn, error) {
return cli.DialContext(network, addr, nil)
}

func (cli *Client) DialContext(network, addr string, ctx interface{}) (Conn, error) {
func (cli *Client) DialContext(network, addr string, ctx any) (Conn, error) {
var (
c net.Conn
err error
Expand All @@ -146,7 +146,7 @@ func (cli *Client) Enroll(nc net.Conn) (gc Conn, err error) {
return cli.EnrollContext(nc, nil)
}

func (cli *Client) EnrollContext(nc net.Conn, ctx interface{}) (gc Conn, err error) {
func (cli *Client) EnrollContext(nc net.Conn, ctx any) (gc Conn, err error) {
connOpened := make(chan struct{})
switch v := nc.(type) {
case *net.TCPConn:
Expand Down
24 changes: 12 additions & 12 deletions connection_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import (
type conn struct {
fd int // file descriptor
gfd gfd.GFD // gnet file descriptor
ctx interface{} // user-defined context
ctx any // user-defined context
remote unix.Sockaddr // remote socket address
localAddr net.Addr // local addr
remoteAddr net.Addr // remote addr
Expand Down Expand Up @@ -243,8 +243,8 @@ type asyncWriteHook struct {
data []byte
}

func (c *conn) asyncWrite(itf interface{}) (err error) {
hook := itf.(*asyncWriteHook)
func (c *conn) asyncWrite(a any) (err error) {
hook := a.(*asyncWriteHook)
defer func() {
if hook.callback != nil {
_ = hook.callback(c, err)
Expand All @@ -264,8 +264,8 @@ type asyncWritevHook struct {
data [][]byte
}

func (c *conn) asyncWritev(itf interface{}) (err error) {
hook := itf.(*asyncWritevHook)
func (c *conn) asyncWritev(a any) (err error) {
hook := a.(*asyncWritevHook)
defer func() {
if hook.callback != nil {
_ = hook.callback(c, err)
Expand Down Expand Up @@ -439,10 +439,10 @@ func (c *conn) OutboundBuffered() int {
return c.outboundBuffer.Buffered()
}

func (c *conn) Context() interface{} { return c.ctx }
func (c *conn) SetContext(ctx interface{}) { c.ctx = ctx }
func (c *conn) LocalAddr() net.Addr { return c.localAddr }
func (c *conn) RemoteAddr() net.Addr { return c.remoteAddr }
func (c *conn) Context() any { return c.ctx }
func (c *conn) SetContext(ctx any) { c.ctx = ctx }
func (c *conn) LocalAddr() net.Addr { return c.localAddr }
func (c *conn) RemoteAddr() net.Addr { return c.remoteAddr }

// Implementation of Socket interface

Expand Down Expand Up @@ -489,7 +489,7 @@ func (c *conn) AsyncWritev(bs [][]byte, callback AsyncCallback) error {
}

func (c *conn) Wake(callback AsyncCallback) error {
return c.loop.poller.Trigger(queue.LowPriority, func(_ interface{}) (err error) {
return c.loop.poller.Trigger(queue.LowPriority, func(_ any) (err error) {
err = c.loop.wake(c)
if callback != nil {
_ = callback(c, err)
Expand All @@ -499,7 +499,7 @@ func (c *conn) Wake(callback AsyncCallback) error {
}

func (c *conn) CloseWithCallback(callback AsyncCallback) error {
return c.loop.poller.Trigger(queue.LowPriority, func(_ interface{}) (err error) {
return c.loop.poller.Trigger(queue.LowPriority, func(_ any) (err error) {
err = c.loop.close(c, nil)
if callback != nil {
_ = callback(c, err)
Expand All @@ -509,7 +509,7 @@ func (c *conn) CloseWithCallback(callback AsyncCallback) error {
}

func (c *conn) Close() error {
return c.loop.poller.Trigger(queue.LowPriority, func(_ interface{}) (err error) {
return c.loop.poller.Trigger(queue.LowPriority, func(_ any) (err error) {
err = c.loop.close(c, nil)
return
}, nil)
Expand Down
10 changes: 5 additions & 5 deletions connection_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type openConn struct {

type conn struct {
pc net.PacketConn
ctx interface{} // user-defined context
ctx any // user-defined context
loop *eventloop // owner event-loop
buffer *bbPool.ByteBuffer // reuse memory of inbound data as a temporary buffer
rawConn net.Conn // original connection
Expand Down Expand Up @@ -271,10 +271,10 @@ func (c *conn) OutboundBuffered() int {
return 0
}

func (c *conn) Context() interface{} { return c.ctx }
func (c *conn) SetContext(ctx interface{}) { c.ctx = ctx }
func (c *conn) LocalAddr() net.Addr { return c.localAddr }
func (c *conn) RemoteAddr() net.Addr { return c.remoteAddr }
func (c *conn) Context() any { return c.ctx }
func (c *conn) SetContext(ctx any) { c.ctx = ctx }
func (c *conn) LocalAddr() net.Addr { return c.localAddr }
func (c *conn) RemoteAddr() net.Addr { return c.remoteAddr }

func (c *conn) Fd() (fd int) {
if c.rawConn == nil {
Expand Down
4 changes: 2 additions & 2 deletions engine_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,14 +212,14 @@ func (eng *engine) stop(s Engine) {

// Notify all event-loops to exit.
eng.eventLoops.iterate(func(i int, el *eventloop) bool {
err := el.poller.Trigger(queue.HighPriority, func(_ interface{}) error { return errorx.ErrEngineShutdown }, nil)
err := el.poller.Trigger(queue.HighPriority, func(_ any) error { return errorx.ErrEngineShutdown }, nil)
if err != nil {
eng.opts.Logger.Errorf("failed to enqueue shutdown signal of high-priority for event-loop(%d): %v", i, err)
}
return true
})
if eng.ingress != nil {
err := eng.ingress.poller.Trigger(queue.HighPriority, func(_ interface{}) error { return errorx.ErrEngineShutdown }, nil)
err := eng.ingress.poller.Trigger(queue.HighPriority, func(_ any) error { return errorx.ErrEngineShutdown }, nil)
if err != nil {
eng.opts.Logger.Errorf("failed to enqueue shutdown signal of high-priority for main event-loop: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion engine_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (eng *engine) closeEventLoops() {
func (eng *engine) start(numEventLoop int) error {
for i := 0; i < numEventLoop; i++ {
el := eventloop{
ch: make(chan interface{}, 1024),
ch: make(chan any, 1024),
idx: i,
eng: eng,
connections: make(map[*conn]struct{}),
Expand Down
24 changes: 12 additions & 12 deletions eventloop_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ type connWithCallback struct {
cb func()
}

func (el *eventloop) register(itf interface{}) error {
c, ok := itf.(*conn)
func (el *eventloop) register(a any) error {
c, ok := a.(*conn)
if !ok {
ccb := itf.(*connWithCallback)
ccb := a.(*connWithCallback)
c = ccb.c
defer ccb.cb()
}
Expand Down Expand Up @@ -114,8 +114,8 @@ func (el *eventloop) open(c *conn) error {
return el.handleAction(c, action)
}

func (el *eventloop) read0(itf interface{}) error {
return el.read(itf.(*conn))
func (el *eventloop) read0(a any) error {
return el.read(a.(*conn))
}

func (el *eventloop) read(c *conn) error {
Expand Down Expand Up @@ -166,8 +166,8 @@ loop:
return nil
}

func (el *eventloop) write0(itf interface{}) error {
return el.write(itf.(*conn))
func (el *eventloop) write0(a any) error {
return el.write(a.(*conn))
}

// The default value of UIO_MAXIOV/IOV_MAX is 1024 on Linux and most BSD-like OSs.
Expand Down Expand Up @@ -297,7 +297,7 @@ func (el *eventloop) ticker(ctx context.Context) {
case Shutdown:
// It seems reasonable to mark this as low-priority, waiting for some tasks like asynchronous writes
// to finish up before shutting down the service.
err := el.poller.Trigger(queue.LowPriority, func(_ interface{}) error { return errorx.ErrEngineShutdown }, nil)
err := el.poller.Trigger(queue.LowPriority, func(_ any) error { return errorx.ErrEngineShutdown }, nil)
el.getLogger().Debugf("failed to enqueue shutdown signal of high-priority for event-loop(%d): %v", el.idx, err)
}
if timer == nil {
Expand Down Expand Up @@ -354,8 +354,8 @@ func (el *eventloop) handleAction(c *conn, action Action) error {
}

/*
func (el *eventloop) execCmd(itf interface{}) (err error) {
cmd := itf.(*asyncCmd)
func (el *eventloop) execCmd(a any) (err error) {
cmd := a.(*asyncCmd)
c := el.connections.getConnByGFD(cmd.fd)
if c == nil || c.gfd != cmd.fd {
return errorx.ErrInvalidConn
Expand All @@ -373,9 +373,9 @@ func (el *eventloop) execCmd(itf interface{}) (err error) {
case asyncCmdWake:
return el.wake(c)
case asyncCmdWrite:
_, err = c.Write(cmd.arg.([]byte))
_, err = c.Write(cmd.param.([]byte))
case asyncCmdWritev:
_, err = c.Writev(cmd.arg.([][]byte))
_, err = c.Writev(cmd.param.([][]byte))
default:
return errorx.ErrUnsupportedOp
}
Expand Down
2 changes: 1 addition & 1 deletion eventloop_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
)

type eventloop struct {
ch chan interface{} // channel for event-loop
ch chan any // channel for event-loop
idx int // index of event-loop in event-loops
eng *engine // engine in loop
cache bytes.Buffer // temporary buffer for scattered bytes
Expand Down
16 changes: 9 additions & 7 deletions gnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ type asyncCmd struct {
fd gfd.GFD
typ asyncCmdType
cb AsyncCallback
arg interface{}
param any
}
// AsyncWrite writes data to the given connection asynchronously.
Expand All @@ -135,7 +135,7 @@ func (e Engine) AsyncWrite(fd gfd.GFD, p []byte, cb AsyncCallback) error {
return err
}
return e.eng.sendCmd(&asyncCmd{fd: fd, typ: asyncCmdWrite, cb: cb, arg: p}, false)
return e.eng.sendCmd(&asyncCmd{fd: fd, typ: asyncCmdWrite, cb: cb, param: p}, false)
}
// AsyncWritev is like AsyncWrite, but it accepts a slice of byte slices.
Expand All @@ -144,7 +144,7 @@ func (e Engine) AsyncWritev(fd gfd.GFD, batch [][]byte, cb AsyncCallback) error
return err
}
return e.eng.sendCmd(&asyncCmd{fd: fd, typ: asyncCmdWritev, cb: cb, arg: batch}, false)
return e.eng.sendCmd(&asyncCmd{fd: fd, typ: asyncCmdWritev, cb: cb, param: batch}, false)
}
// Close closes the given connection.
Expand Down Expand Up @@ -237,9 +237,11 @@ type Writer interface {
AsyncWritev(bs [][]byte, callback AsyncCallback) (err error)
}

// AsyncCallback is a callback which will be invoked after the asynchronous functions has finished executing.
// AsyncCallback is a callback that will be invoked after the asynchronous function finishes.
//
// Note that the parameter gnet.Conn is already released under UDP protocol, thus it's not allowed to be accessed.
// Note that the parameter gnet.Conn might have been already released when it's UDP protocol,
// thus it shouldn't be accessed.
// This callback must not block, otherwise, it blocks the event-loop.
type AsyncCallback func(c Conn, err error) error

// Socket is a set of functions which manipulate the underlying file descriptor of a connection.
Expand Down Expand Up @@ -303,11 +305,11 @@ type Conn interface {

// Context returns a user-defined context, it's not concurrency-safe,
// you must invoke it within any method in EventHandler.
Context() (ctx interface{})
Context() (ctx any)

// SetContext sets a user-defined context, it's not concurrency-safe,
// you must invoke it within any method in EventHandler.
SetContext(ctx interface{})
SetContext(ctx any)

// LocalAddr is the connection's local socket address, it's not concurrency-safe,
// you must invoke it within any method in EventHandler.
Expand Down
8 changes: 4 additions & 4 deletions internal/netpoll/poller_epoll_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ var (
// any asks other than high-priority tasks will be shunted to asyncTaskQueue.
//
// Note that asyncTaskQueue is a queue of low-priority whose size may grow large and tasks in it may backlog.
func (p *Poller) Trigger(priority queue.EventPriority, fn queue.TaskFunc, arg interface{}) (err error) {
func (p *Poller) Trigger(priority queue.EventPriority, fn queue.Func, param any) (err error) {
task := queue.GetTask()
task.Run, task.Arg = fn, arg
task.Exec, task.Param = fn, param
if priority > queue.HighPriority && p.urgentAsyncTaskQueue.Length() >= p.highPriorityEventsThreshold {
p.asyncTaskQueue.Enqueue(task)
} else {
Expand Down Expand Up @@ -145,7 +145,7 @@ func (p *Poller) Polling(callback PollEventHandler) error {
doChores = false
task := p.urgentAsyncTaskQueue.Dequeue()
for ; task != nil; task = p.urgentAsyncTaskQueue.Dequeue() {
err = task.Run(task.Arg)
err = task.Exec(task.Param)
if errors.Is(err, errorx.ErrEngineShutdown) {
return err
}
Expand All @@ -155,7 +155,7 @@ func (p *Poller) Polling(callback PollEventHandler) error {
if task = p.asyncTaskQueue.Dequeue(); task == nil {
break
}
err = task.Run(task.Arg)
err = task.Exec(task.Param)
if errors.Is(err, errorx.ErrEngineShutdown) {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions internal/netpoll/poller_epoll_ultimate.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ var (
// any asks other than high-priority tasks will be shunted to asyncTaskQueue.
//
// Note that asyncTaskQueue is a queue of low-priority whose size may grow large and tasks in it may backlog.
func (p *Poller) Trigger(priority queue.EventPriority, fn queue.TaskFunc, arg interface{}) (err error) {
func (p *Poller) Trigger(priority queue.EventPriority, fn queue.Func, param any) (err error) {
task := queue.GetTask()
task.Run, task.Arg = fn, arg
task.Exec, task.Param = fn, param
if priority > queue.HighPriority && p.urgentAsyncTaskQueue.Length() >= p.highPriorityEventsThreshold {
p.asyncTaskQueue.Enqueue(task)
} else {
Expand Down Expand Up @@ -147,7 +147,7 @@ func (p *Poller) Polling() error {
doChores = false
task := p.urgentAsyncTaskQueue.Dequeue()
for ; task != nil; task = p.urgentAsyncTaskQueue.Dequeue() {
err = task.Run(task.Arg)
err = task.Exec(task.Param)
if errors.Is(err, errorx.ErrEngineShutdown) {
return err
}
Expand All @@ -157,7 +157,7 @@ func (p *Poller) Polling() error {
if task = p.asyncTaskQueue.Dequeue(); task == nil {
break
}
err = task.Run(task.Arg)
err = task.Exec(task.Param)
if errors.Is(err, errorx.ErrEngineShutdown) {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions internal/netpoll/poller_kqueue_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ func (p *Poller) Close() error {
// any asks other than high-priority tasks will be shunted to asyncTaskQueue.
//
// Note that asyncTaskQueue is a queue of low-priority whose size may grow large and tasks in it may backlog.
func (p *Poller) Trigger(priority queue.EventPriority, fn queue.TaskFunc, arg interface{}) (err error) {
func (p *Poller) Trigger(priority queue.EventPriority, fn queue.Func, param any) (err error) {
task := queue.GetTask()
task.Run, task.Arg = fn, arg
task.Exec, task.Param = fn, param
if priority > queue.HighPriority && p.urgentAsyncTaskQueue.Length() >= p.highPriorityEventsThreshold {
p.asyncTaskQueue.Enqueue(task)
} else {
Expand Down Expand Up @@ -130,7 +130,7 @@ func (p *Poller) Polling(callback PollEventHandler) error {
doChores = false
task := p.urgentAsyncTaskQueue.Dequeue()
for ; task != nil; task = p.urgentAsyncTaskQueue.Dequeue() {
err = task.Run(task.Arg)
err = task.Exec(task.Param)
if errors.Is(err, errorx.ErrEngineShutdown) {
return err
}
Expand All @@ -140,7 +140,7 @@ func (p *Poller) Polling(callback PollEventHandler) error {
if task = p.asyncTaskQueue.Dequeue(); task == nil {
break
}
err = task.Run(task.Arg)
err = task.Exec(task.Param)
if errors.Is(err, errorx.ErrEngineShutdown) {
return err
}
Expand Down
Loading
Loading