Skip to content

Commit

Permalink
opt: renovate the concurrency management of gnet engine
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 committed Dec 4, 2024
1 parent 451f015 commit 213e164
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 144 deletions.
30 changes: 13 additions & 17 deletions client_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"errors"
"net"
"strconv"
"sync"
"syscall"

"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -66,20 +65,17 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) {
return
}

shutdownCtx, shutdown := context.WithCancel(context.Background())
rootCtx, shutdown := context.WithCancel(context.Background())
eg, ctx := errgroup.WithContext(rootCtx)
eng := engine{
listeners: make(map[int]*listener),
opts: options,
turnOff: shutdown,
eventHandler: eh,
workerPool: struct {
concurrency: struct {
*errgroup.Group
shutdownCtx context.Context
shutdown context.CancelFunc
once sync.Once
}{&errgroup.Group{}, shutdownCtx, shutdown, sync.Once{}},
}
if options.Ticker {
eng.ticker.ctx, eng.ticker.cancel = context.WithCancel(context.Background())
ctx context.Context
}{eg, ctx},
}
el := eventloop{
listeners: eng.listeners,
Expand Down Expand Up @@ -124,10 +120,14 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) {
func (cli *Client) Start() error {
logging.Infof("Starting gnet client with 1 event-loop")
cli.el.eventHandler.OnBoot(Engine{cli.el.engine})
cli.el.engine.workerPool.Go(cli.el.run)
cli.el.engine.concurrency.Go(cli.el.run)
// Start the ticker.
if cli.opts.Ticker {
go cli.el.ticker(cli.el.engine.ticker.ctx)
ctx := cli.el.engine.concurrency.ctx
cli.el.engine.concurrency.Go(func() error {
cli.el.ticker(ctx)
return nil
})
}
logging.Debugf("default logging level is %s", logging.LogLevel())
return nil
Expand All @@ -136,11 +136,7 @@ func (cli *Client) Start() error {
// Stop stops the client event-loop.
func (cli *Client) Stop() (err error) {
logging.Error(cli.el.poller.Trigger(queue.HighPriority, func(_ any) error { return errorx.ErrEngineShutdown }, nil))
// Stop the ticker.
if cli.opts.Ticker {
cli.el.engine.ticker.cancel()
}
_ = cli.el.engine.workerPool.Wait()
err = cli.el.engine.concurrency.Wait()
logging.Error(cli.el.poller.Close())
cli.el.eventHandler.OnShutdown(Engine{cli.el.engine})
logging.Cleanup()
Expand Down
31 changes: 14 additions & 17 deletions client_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,17 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) {
}
logging.SetDefaultLoggerAndFlusher(logger, logFlusher)

shutdownCtx, shutdown := context.WithCancel(context.Background())
rootCtx, shutdown := context.WithCancel(context.Background())
eg, ctx := errgroup.WithContext(rootCtx)
eng := &engine{
listeners: []*listener{},
opts: options,
workerPool: struct {
*errgroup.Group
shutdownCtx context.Context
shutdown context.CancelFunc
once sync.Once
}{&errgroup.Group{}, shutdownCtx, shutdown, sync.Once{}},
listeners: []*listener{},
opts: options,
turnOff: shutdown,
eventHandler: eh,
concurrency: struct {
*errgroup.Group
ctx context.Context
}{eg, ctx},
}
cli.el = &eventloop{
ch: make(chan any, 1024),
Expand All @@ -71,11 +71,11 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) {

func (cli *Client) Start() error {
cli.el.eventHandler.OnBoot(Engine{cli.el.eng})
cli.el.eng.workerPool.Go(cli.el.run)
cli.el.eng.concurrency.Go(cli.el.run)
if cli.opts.Ticker {
cli.el.eng.ticker.ctx, cli.el.eng.ticker.cancel = context.WithCancel(context.Background())
cli.el.eng.workerPool.Go(func() error {
cli.el.ticker(cli.el.eng.ticker.ctx)
ctx := cli.el.eng.concurrency.ctx
cli.el.eng.concurrency.Go(func() error {
cli.el.ticker(ctx)
return nil
})
}
Expand All @@ -85,10 +85,7 @@ func (cli *Client) Start() error {

func (cli *Client) Stop() (err error) {
cli.el.ch <- errorx.ErrEngineShutdown
if cli.opts.Ticker {
cli.el.eng.ticker.cancel()
}
_ = cli.el.eng.workerPool.Wait()
err = cli.el.eng.concurrency.Wait()
cli.el.eventHandler.OnShutdown(Engine{cli.el.eng})
logging.Cleanup()
return
Expand Down
97 changes: 40 additions & 57 deletions engine_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"errors"
"runtime"
"strings"
"sync"
"sync/atomic"

"golang.org/x/sync/errgroup"
Expand All @@ -35,27 +34,22 @@ import (
)

type engine struct {
listeners map[int]*listener // listeners for accepting incoming connections
opts *Options // options with engine
ingress *eventloop // main event-loop that monitors all listeners
eventLoops loadBalancer // event-loops for handling events
inShutdown int32 // whether the engine is in shutdown
ticker struct {
ctx context.Context // context for ticker
cancel context.CancelFunc // function to stop the ticker
}
workerPool struct {
listeners map[int]*listener // listeners for accepting incoming connections
opts *Options // options with engine
ingress *eventloop // main event-loop that monitors all listeners
eventLoops loadBalancer // event-loops for handling events
inShutdown atomic.Bool // whether the engine is in shutdown
turnOff context.CancelFunc
eventHandler EventHandler // user eventHandler
concurrency struct {
*errgroup.Group

shutdownCtx context.Context
shutdown context.CancelFunc
once sync.Once
ctx context.Context
}
eventHandler EventHandler // user eventHandler
}

func (eng *engine) isInShutdown() bool {
return atomic.LoadInt32(&eng.inShutdown) == 1
func (eng *engine) isShutdown() bool {
return eng.inShutdown.Load()
}

// shutdown signals the engine to shut down.
Expand All @@ -64,9 +58,7 @@ func (eng *engine) shutdown(err error) {
eng.opts.Logger.Errorf("engine is being shutdown with error: %v", err)
}

eng.workerPool.once.Do(func() {
eng.workerPool.shutdown()
})
eng.turnOff()
}

func (eng *engine) closeEventLoops() {
Expand All @@ -88,7 +80,7 @@ func (eng *engine) closeEventLoops() {
}
}

func (eng *engine) runEventLoops(numEventLoop int) error {
func (eng *engine) runEventLoops(ctx context.Context, numEventLoop int) error {
var el0 *eventloop
lns := eng.listeners
// Create loops locally and bind the listeners.
Expand Down Expand Up @@ -129,21 +121,21 @@ func (eng *engine) runEventLoops(numEventLoop int) error {

// Start event-loops in background.
eng.eventLoops.iterate(func(_ int, el *eventloop) bool {
eng.workerPool.Go(el.run)
eng.concurrency.Go(el.run)
return true
})

if el0 != nil {
eng.workerPool.Go(func() error {
el0.ticker(eng.ticker.ctx)
eng.concurrency.Go(func() error {
el0.ticker(ctx)
return nil
})
}

return nil
}

func (eng *engine) activateReactors(numEventLoop int) error {
func (eng *engine) activateReactors(ctx context.Context, numEventLoop int) error {
for i := 0; i < numEventLoop; i++ {
p, err := netpoll.OpenPoller()
if err != nil {
Expand All @@ -161,7 +153,7 @@ func (eng *engine) activateReactors(numEventLoop int) error {

// Start sub reactors in background.
eng.eventLoops.iterate(func(_ int, el *eventloop) bool {
eng.workerPool.Go(el.orbit)
eng.concurrency.Go(el.orbit)
return true
})

Expand All @@ -183,30 +175,30 @@ func (eng *engine) activateReactors(numEventLoop int) error {
eng.ingress = el

// Start main reactor in background.
eng.workerPool.Go(el.rotate)
eng.concurrency.Go(el.rotate)

// Start the ticker.
if eng.opts.Ticker {
eng.workerPool.Go(func() error {
eng.ingress.ticker(eng.ticker.ctx)
eng.concurrency.Go(func() error {
eng.ingress.ticker(ctx)
return nil
})
}

return nil
}

func (eng *engine) start(numEventLoop int) error {
func (eng *engine) start(ctx context.Context, numEventLoop int) error {
if eng.opts.ReusePort {
return eng.runEventLoops(numEventLoop)
return eng.runEventLoops(ctx, numEventLoop)
}

return eng.activateReactors(numEventLoop)
return eng.activateReactors(ctx, numEventLoop)
}

func (eng *engine) stop(s Engine) {
func (eng *engine) stop(ctx context.Context, s Engine) {
// Wait on a signal for shutdown
<-eng.workerPool.shutdownCtx.Done()
<-ctx.Done()

eng.eventHandler.OnShutdown(s)

Expand All @@ -225,20 +217,15 @@ func (eng *engine) stop(s Engine) {
}
}

// Stop the ticker.
if eng.ticker.cancel != nil {
eng.ticker.cancel()
}

if err := eng.workerPool.Wait(); err != nil {
if err := eng.concurrency.Wait(); err != nil {
eng.opts.Logger.Errorf("engine shutdown error: %v", err)
}

// Close all listeners and pollers of event-loops.
eng.closeEventLoops()

// Put the engine into the shutdown state.
atomic.StoreInt32(&eng.inShutdown, 1)
eng.inShutdown.Store(true)
}

func run(eventHandler EventHandler, listeners []*listener, options *Options, addrs []string) error {
Expand All @@ -261,17 +248,17 @@ func run(eventHandler EventHandler, listeners []*listener, options *Options, add
for _, ln := range listeners {
lns[ln.fd] = ln
}
shutdownCtx, shutdown := context.WithCancel(context.Background())
rootCtx, shutdown := context.WithCancel(context.Background())
eg, ctx := errgroup.WithContext(rootCtx)
eng := engine{
listeners: lns,
opts: options,
workerPool: struct {
*errgroup.Group
shutdownCtx context.Context
shutdown context.CancelFunc
once sync.Once
}{&errgroup.Group{}, shutdownCtx, shutdown, sync.Once{}},
listeners: lns,
opts: options,
turnOff: shutdown,
eventHandler: eventHandler,
concurrency: struct {
*errgroup.Group
ctx context.Context
}{eg, ctx},
}
switch options.LB {
case RoundRobin:
Expand All @@ -282,23 +269,19 @@ func run(eventHandler EventHandler, listeners []*listener, options *Options, add
eng.eventLoops = new(sourceAddrHashLoadBalancer)
}

if eng.opts.Ticker {
eng.ticker.ctx, eng.ticker.cancel = context.WithCancel(context.Background())
}

e := Engine{&eng}
switch eng.eventHandler.OnBoot(e) {
case None:
case None, Close:
case Shutdown:
return nil
}

if err := eng.start(numEventLoop); err != nil {
if err := eng.start(ctx, numEventLoop); err != nil {
eng.closeEventLoops()
eng.opts.Logger.Errorf("gnet engine is stopping with error: %v", err)
return err
}
defer eng.stop(e)
defer eng.stop(rootCtx, e)

for _, addr := range addrs {
allEngines.Store(addr, &eng)
Expand Down
Loading

0 comments on commit 213e164

Please sign in to comment.