Skip to content

Commit

Permalink
opt: mitigate the latency issue by prioritizing asynchronous writes
Browse files Browse the repository at this point in the history
Fixes #423
  • Loading branch information
panjf2000 committed Mar 30, 2024
1 parent f7cfb5b commit bb32a8c
Show file tree
Hide file tree
Showing 11 changed files with 103 additions and 115 deletions.
3 changes: 2 additions & 1 deletion acceptor_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"golang.org/x/sys/unix"

"github.com/panjf2000/gnet/v2/internal/netpoll"
"github.com/panjf2000/gnet/v2/internal/queue"
"github.com/panjf2000/gnet/v2/internal/socket"
"github.com/panjf2000/gnet/v2/pkg/errors"
"github.com/panjf2000/gnet/v2/pkg/logging"
Expand Down Expand Up @@ -51,7 +52,7 @@ func (eng *engine) accept1(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error {

el := eng.eventLoops.next(remoteAddr)
c := newTCPConn(nfd, el, sa, el.ln.addr, remoteAddr)
err = el.poller.UrgentTrigger(el.register, c)
err = el.poller.Trigger(queue.HighPriority, el.register, c)
if err != nil {
eng.opts.Logger.Errorf("UrgentTrigger() failed due to error: %v", err)
_ = unix.Close(nfd)
Expand Down
5 changes: 3 additions & 2 deletions client_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/panjf2000/gnet/v2/internal/math"
"github.com/panjf2000/gnet/v2/internal/netpoll"
"github.com/panjf2000/gnet/v2/internal/queue"
"github.com/panjf2000/gnet/v2/internal/socket"
"github.com/panjf2000/gnet/v2/pkg/buffer/ring"
errorx "github.com/panjf2000/gnet/v2/pkg/errors"
Expand Down Expand Up @@ -126,7 +127,7 @@ func (cli *Client) Start() error {

// Stop stops the client event-loop.
func (cli *Client) Stop() (err error) {
logging.Error(cli.el.poller.UrgentTrigger(func(_ interface{}) error { return errorx.ErrEngineShutdown }, nil))
logging.Error(cli.el.poller.Trigger(queue.HighPriority, func(_ interface{}) error { return errorx.ErrEngineShutdown }, nil))
// Stop the ticker.
if cli.opts.Ticker {
cli.el.engine.ticker.cancel()
Expand Down Expand Up @@ -233,7 +234,7 @@ func (cli *Client) EnrollContext(c net.Conn, ctx interface{}) (Conn, error) {
ccb := &connWithCallback{c: gc, cb: func() {
close(connOpened)
}}
err = cli.el.poller.UrgentTrigger(cli.el.register, ccb)
err = cli.el.poller.Trigger(queue.HighPriority, cli.el.register, ccb)
if err != nil {
gc.Close()
return nil, err
Expand Down
11 changes: 6 additions & 5 deletions connection_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/panjf2000/gnet/v2/internal/gfd"
gio "github.com/panjf2000/gnet/v2/internal/io"
"github.com/panjf2000/gnet/v2/internal/netpoll"
"github.com/panjf2000/gnet/v2/internal/queue"
"github.com/panjf2000/gnet/v2/internal/socket"
"github.com/panjf2000/gnet/v2/pkg/buffer/elastic"
errorx "github.com/panjf2000/gnet/v2/pkg/errors"
Expand Down Expand Up @@ -442,18 +443,18 @@ func (c *conn) AsyncWrite(buf []byte, callback AsyncCallback) error {
}
return err
}
return c.loop.poller.Trigger(c.asyncWrite, &asyncWriteHook{callback, buf})
return c.loop.poller.Trigger(queue.HighPriority, c.asyncWrite, &asyncWriteHook{callback, buf})
}

func (c *conn) AsyncWritev(bs [][]byte, callback AsyncCallback) error {
if c.isDatagram {
return errorx.ErrUnsupportedOp
}
return c.loop.poller.Trigger(c.asyncWritev, &asyncWritevHook{callback, bs})
return c.loop.poller.Trigger(queue.HighPriority, c.asyncWritev, &asyncWritevHook{callback, bs})
}

func (c *conn) Wake(callback AsyncCallback) error {
return c.loop.poller.UrgentTrigger(func(_ interface{}) (err error) {
return c.loop.poller.Trigger(queue.LowPriority, func(_ interface{}) (err error) {
err = c.loop.wake(c)
if callback != nil {
_ = callback(c, err)
Expand All @@ -463,7 +464,7 @@ func (c *conn) Wake(callback AsyncCallback) error {
}

func (c *conn) CloseWithCallback(callback AsyncCallback) error {
return c.loop.poller.Trigger(func(_ interface{}) (err error) {
return c.loop.poller.Trigger(queue.LowPriority, func(_ interface{}) (err error) {
err = c.loop.close(c, nil)
if callback != nil {
_ = callback(c, err)
Expand All @@ -473,7 +474,7 @@ func (c *conn) CloseWithCallback(callback AsyncCallback) error {
}

func (c *conn) Close() error {
return c.loop.poller.Trigger(func(_ interface{}) (err error) {
return c.loop.poller.Trigger(queue.LowPriority, func(_ interface{}) (err error) {
err = c.loop.close(c, nil)
return
}, nil)
Expand Down
5 changes: 3 additions & 2 deletions engine_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/panjf2000/gnet/v2/internal/gfd"
"github.com/panjf2000/gnet/v2/internal/netpoll"
"github.com/panjf2000/gnet/v2/internal/queue"
"github.com/panjf2000/gnet/v2/pkg/errors"
)

Expand Down Expand Up @@ -203,14 +204,14 @@ func (eng *engine) stop(s Engine) {

// Notify all event-loops to exit.
eng.eventLoops.iterate(func(_ int, el *eventloop) bool {
err := el.poller.UrgentTrigger(func(_ interface{}) error { return errors.ErrEngineShutdown }, nil)
err := el.poller.Trigger(queue.HighPriority, func(_ interface{}) error { return errors.ErrEngineShutdown }, nil)
if err != nil {
eng.opts.Logger.Errorf("failed to call UrgentTrigger on sub event-loop when stopping engine: %v", err)
}
return true
})
if eng.acceptor != nil {
err := eng.acceptor.poller.UrgentTrigger(func(_ interface{}) error { return errors.ErrEngineShutdown }, nil)
err := eng.acceptor.poller.Trigger(queue.HighPriority, func(_ interface{}) error { return errors.ErrEngineShutdown }, nil)
if err != nil {
eng.opts.Logger.Errorf("failed to call UrgentTrigger on main event-loop when stopping engine: %v", err)
}
Expand Down
5 changes: 4 additions & 1 deletion eventloop_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/panjf2000/gnet/v2/internal/io"
"github.com/panjf2000/gnet/v2/internal/netpoll"
"github.com/panjf2000/gnet/v2/internal/queue"
errorx "github.com/panjf2000/gnet/v2/pkg/errors"
"github.com/panjf2000/gnet/v2/pkg/logging"
)
Expand Down Expand Up @@ -253,7 +254,9 @@ func (el *eventloop) ticker(ctx context.Context) {
switch action {
case None:
case Shutdown:
err := el.poller.UrgentTrigger(func(_ interface{}) error { return errorx.ErrEngineShutdown }, nil)
// It seems reasonable to mark this as low-priority, waiting for some tasks like asynchronous writes
// to finish up before shutting down the service.
err := el.poller.Trigger(queue.LowPriority, func(_ interface{}) error { return errorx.ErrEngineShutdown }, nil)
el.getLogger().Debugf("stopping ticker in event-loop(%d) from OnTick(), UrgentTrigger:%v", el.idx, err)
}
if timer == nil {
Expand Down
45 changes: 18 additions & 27 deletions internal/netpoll/epoll_default_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ import (

// Poller represents a poller which is in charge of monitoring file-descriptors.
type Poller struct {
fd int // epoll fd
efd int // eventfd
efdBuf []byte // efd buffer to read an 8-byte integer
wakeupCall int32
asyncTaskQueue queue.AsyncTaskQueue // queue with low priority
urgentAsyncTaskQueue queue.AsyncTaskQueue // queue with high priority
fd int // epoll fd
efd int // eventfd
efdBuf []byte // efd buffer to read an 8-byte integer
wakeupCall int32
asyncTaskQueue queue.AsyncTaskQueue // queue with low priority
urgentAsyncTaskQueue queue.AsyncTaskQueue // queue with high priority
highPriorityEventsThreshold int32 // threshold of high-priority events
}

// OpenPoller instantiates a poller.
Expand All @@ -63,6 +64,7 @@ func OpenPoller() (poller *Poller, err error) {
}
poller.asyncTaskQueue = queue.NewLockFreeQueue()
poller.urgentAsyncTaskQueue = queue.NewLockFreeQueue()
poller.highPriorityEventsThreshold = MaxPollEventsCap
return
}

Expand All @@ -81,31 +83,20 @@ var (
b = (*(*[8]byte)(unsafe.Pointer(&u)))[:]
)

// UrgentTrigger puts task into urgentAsyncTaskQueue and wakes up the poller which is waiting for network-events,
// then the poller will get tasks from urgentAsyncTaskQueue and run them.
// Trigger enqueues task and wakes up the poller to process pending tasks.
// By default, any incoming task will enqueued into urgentAsyncTaskQueue
// before the threshold of high-priority events is reached. When it happens,
// any asks other than high-priority tasks will be shunted to asyncTaskQueue.
//
// Note that urgentAsyncTaskQueue is a queue with high-priority and its size is expected to be small,
// so only those urgent tasks should be put into this queue.
func (p *Poller) UrgentTrigger(fn queue.TaskFunc, arg interface{}) (err error) {
// Note that asyncTaskQueue is a queue of low-priority whose size may grow large and tasks in it may backlog.
func (p *Poller) Trigger(priority queue.EventPriority, fn queue.TaskFunc, arg interface{}) (err error) {
task := queue.GetTask()
task.Run, task.Arg = fn, arg
p.urgentAsyncTaskQueue.Enqueue(task)
if atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) {
if _, err = unix.Write(p.efd, b); err == unix.EAGAIN {
err = nil
}
if priority > queue.HighPriority && p.urgentAsyncTaskQueue.Length() >= p.highPriorityEventsThreshold {
p.asyncTaskQueue.Enqueue(task)
} else {
p.urgentAsyncTaskQueue.Enqueue(task)
}
return os.NewSyscallError("write", err)
}

// Trigger is like UrgentTrigger but it puts task into asyncTaskQueue,
// call this method when the task is not so urgent, for instance writing data back to the peer.
//
// Note that asyncTaskQueue is a queue with low-priority whose size may grow large and tasks in it may backlog.
func (p *Poller) Trigger(fn queue.TaskFunc, arg interface{}) (err error) {
task := queue.GetTask()
task.Run, task.Arg = fn, arg
p.asyncTaskQueue.Enqueue(task)
if atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) {
if _, err = unix.Write(p.efd, b); err == unix.EAGAIN {
err = nil
Expand Down
45 changes: 18 additions & 27 deletions internal/netpoll/epoll_optimized_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,13 @@ import (

// Poller represents a poller which is in charge of monitoring file-descriptors.
type Poller struct {
fd int // epoll fd
epa *PollAttachment // PollAttachment for waking events
efdBuf []byte // efd buffer to read an 8-byte integer
wakeupCall int32
asyncTaskQueue queue.AsyncTaskQueue // queue with low priority
urgentAsyncTaskQueue queue.AsyncTaskQueue // queue with high priority
fd int // epoll fd
epa *PollAttachment // PollAttachment for waking events
efdBuf []byte // efd buffer to read an 8-byte integer
wakeupCall int32
asyncTaskQueue queue.AsyncTaskQueue // queue with low priority
urgentAsyncTaskQueue queue.AsyncTaskQueue // queue with high priority
highPriorityEventsThreshold int32 // threshold of high-priority events
}

// OpenPoller instantiates a poller.
Expand All @@ -64,6 +65,7 @@ func OpenPoller() (poller *Poller, err error) {
}
poller.asyncTaskQueue = queue.NewLockFreeQueue()
poller.urgentAsyncTaskQueue = queue.NewLockFreeQueue()
poller.highPriorityEventsThreshold = MaxPollEventsCap
return
}

Expand All @@ -82,31 +84,20 @@ var (
b = (*(*[8]byte)(unsafe.Pointer(&u)))[:]
)

// UrgentTrigger puts task into urgentAsyncTaskQueue and wakes up the poller which is waiting for network-events,
// then the poller will get tasks from urgentAsyncTaskQueue and run them.
// Trigger enqueues task and wakes up the poller to process pending tasks.
// By default, any incoming task will enqueued into urgentAsyncTaskQueue
// before the threshold of high-priority events is reached. When it happens,
// any asks other than high-priority tasks will be shunted to asyncTaskQueue.
//
// Note that urgentAsyncTaskQueue is a queue with high-priority and its size is expected to be small,
// so only those urgent tasks should be put into this queue.
func (p *Poller) UrgentTrigger(fn queue.TaskFunc, arg interface{}) (err error) {
// Note that asyncTaskQueue is a queue of low-priority whose size may grow large and tasks in it may backlog.
func (p *Poller) Trigger(priority queue.EventPriority, fn queue.TaskFunc, arg interface{}) (err error) {
task := queue.GetTask()
task.Run, task.Arg = fn, arg
p.urgentAsyncTaskQueue.Enqueue(task)
if atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) {
if _, err = unix.Write(p.epa.FD, b); err == unix.EAGAIN {
err = nil
}
if priority > queue.HighPriority && p.urgentAsyncTaskQueue.Length() >= p.highPriorityEventsThreshold {
p.asyncTaskQueue.Enqueue(task)
} else {
p.urgentAsyncTaskQueue.Enqueue(task)
}
return os.NewSyscallError("write", err)
}

// Trigger is like UrgentTrigger but it puts task into asyncTaskQueue,
// call this method when the task is not so urgent, for instance writing data back to the peer.
//
// Note that asyncTaskQueue is a queue with low-priority whose size may grow large and tasks in it may backlog.
func (p *Poller) Trigger(fn queue.TaskFunc, arg interface{}) (err error) {
task := queue.GetTask()
task.Run, task.Arg = fn, arg
p.asyncTaskQueue.Enqueue(task)
if atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) {
if _, err = unix.Write(p.epa.FD, b); err == unix.EAGAIN {
err = nil
Expand Down
41 changes: 16 additions & 25 deletions internal/netpoll/kqueue_default_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ import (

// Poller represents a poller which is in charge of monitoring file-descriptors.
type Poller struct {
fd int
wakeupCall int32
asyncTaskQueue queue.AsyncTaskQueue // queue with low priority
urgentAsyncTaskQueue queue.AsyncTaskQueue // queue with high priority
fd int
wakeupCall int32
asyncTaskQueue queue.AsyncTaskQueue // queue with low priority
urgentAsyncTaskQueue queue.AsyncTaskQueue // queue with high priority
highPriorityEventsThreshold int32 // threshold of high-priority events
}

// OpenPoller instantiates a poller.
Expand All @@ -58,6 +59,7 @@ func OpenPoller() (poller *Poller, err error) {
}
poller.asyncTaskQueue = queue.NewLockFreeQueue()
poller.urgentAsyncTaskQueue = queue.NewLockFreeQueue()
poller.highPriorityEventsThreshold = MaxPollEventsCap
return
}

Expand All @@ -72,31 +74,20 @@ var note = []unix.Kevent_t{{
Fflags: unix.NOTE_TRIGGER,
}}

// UrgentTrigger puts task into urgentAsyncTaskQueue and wakes up the poller which is waiting for network-events,
// then the poller will get tasks from urgentAsyncTaskQueue and run them.
// Trigger enqueues task and wakes up the poller to process pending tasks.
// By default, any incoming task will enqueued into urgentAsyncTaskQueue
// before the threshold of high-priority events is reached. When it happens,
// any asks other than high-priority tasks will be shunted to asyncTaskQueue.
//
// Note that urgentAsyncTaskQueue is a queue with high-priority and its size is expected to be small,
// so only those urgent tasks should be put into this queue.
func (p *Poller) UrgentTrigger(fn queue.TaskFunc, arg interface{}) (err error) {
// Note that asyncTaskQueue is a queue of low-priority whose size may grow large and tasks in it may backlog.
func (p *Poller) Trigger(priority queue.EventPriority, fn queue.TaskFunc, arg interface{}) (err error) {
task := queue.GetTask()
task.Run, task.Arg = fn, arg
p.urgentAsyncTaskQueue.Enqueue(task)
if atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) {
if _, err = unix.Kevent(p.fd, note, nil, nil); err == unix.EAGAIN {
err = nil
}
if priority > queue.HighPriority && p.urgentAsyncTaskQueue.Length() >= p.highPriorityEventsThreshold {
p.asyncTaskQueue.Enqueue(task)
} else {
p.urgentAsyncTaskQueue.Enqueue(task)
}
return os.NewSyscallError("kevent trigger", err)
}

// Trigger is like UrgentTrigger but it puts task into asyncTaskQueue,
// call this method when the task is not so urgent, for instance writing data back to the peer.
//
// Note that asyncTaskQueue is a queue with low-priority whose size may grow large and tasks in it may backlog.
func (p *Poller) Trigger(fn queue.TaskFunc, arg interface{}) (err error) {
task := queue.GetTask()
task.Run, task.Arg = fn, arg
p.asyncTaskQueue.Enqueue(task)
if atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) {
if _, err = unix.Kevent(p.fd, note, nil, nil); err == unix.EAGAIN {
err = nil
Expand Down
Loading

0 comments on commit bb32a8c

Please sign in to comment.