Skip to content

Commit

Permalink
daemon: Introduce the minimal fx application
Browse files Browse the repository at this point in the history
As the first step towards a more modular cilium-agent
application, bring in uber/fx by making runDaemon into a
provider of *Daemon.

This retains all existing behaviour, but will allow us
to incrementally lift modules out from Daemon.

The daemon cleanup is refactored to fit into uber/fx by
wrapping it into a module that adds a stop hook to run the
registered cleanup functions.

Signed-off-by: Jussi Maki <[email protected]>
  • Loading branch information
joamaki authored and aanm committed Aug 2, 2022
1 parent 40da4c6 commit cef7071
Show file tree
Hide file tree
Showing 78 changed files with 11,147 additions and 121 deletions.
42 changes: 42 additions & 0 deletions daemon/cmd/app.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium

package cmd

import (
"context"

"go.uber.org/fx"
)

func runApp() {
ctx, cancel := context.WithCancel(context.Background())
app := fx.New(
fx.WithLogger(newAppLogger),
fx.Supply(fx.Annotate(ctx, fx.As(new(context.Context)))),
cleanerModule,
fx.Provide(daemonModule),
fx.Invoke(func(*Daemon) {}),

// The first thing to do when stopping is to cancel the
// daemon-wide context.
fx.Invoke(appendOnStop(cancel)),
)

if app.Err() != nil {
log.WithError(app.Err()).Fatal("Failed to initialize daemon")
}

app.Run()
}

func appendOnStop(onStop func()) func(fx.Lifecycle) {
return func(lc fx.Lifecycle) {
lc.Append(fx.Hook{
OnStop: func(context.Context) error {
onStop()
return nil
},
})
}
}
148 changes: 148 additions & 0 deletions daemon/cmd/app_logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium

package cmd

import (
"strings"

"github.com/sirupsen/logrus"
"go.uber.org/fx/fxevent"
)

type appLogger struct {
*logrus.Entry
}

func newAppLogger() fxevent.Logger {
return appLogger{Entry: log}
}

func (log appLogger) LogEvent(event fxevent.Event) {
switch e := event.(type) {
case *fxevent.OnStartExecuting:
log.WithField("callee", e.FunctionName).
WithField("caller", e.CallerName).
Debug("OnStart hook executing")

case *fxevent.OnStartExecuted:
if e.Err != nil {
log.WithField("callee", e.FunctionName).
WithField("caller", e.CallerName).
WithError(e.Err).
Debug("OnStart hook failed")
} else {
log.WithField("callee", e.FunctionName).
WithField("caller", e.CallerName).
WithField("runtime", e.Runtime.String()).
Debug("OnStart hook executed")
}

case *fxevent.OnStopExecuting:
log.WithField("callee", e.FunctionName).
WithField("caller", e.CallerName).
Debug("OnStop hook executing")

case *fxevent.OnStopExecuted:
if e.Err != nil {
log.WithField("callee", e.FunctionName).
WithField("caller", e.CallerName).
WithError(e.Err).
Error("OnStop hook failed")
} else {
log.WithField("callee", e.FunctionName).
WithField("caller", e.CallerName).
WithField("runtime", e.Runtime.String()).
Debug("OnStop hook executed")
}

case *fxevent.Supplied:
l := log.WithField("type", e.TypeName)
if len(e.ModuleName) != 0 {
l = l.WithField("module", e.ModuleName)
}
if e.Err != nil {
l = l.WithError(e.Err)
}
l.Debug("Supplied")

case *fxevent.Provided:
l := log.WithField("constructor", e.ConstructorName)
if len(e.ModuleName) != 0 {
l = l.WithField("module", e.ModuleName)
}

for _, rtype := range e.OutputTypeNames {
l.WithField("type", rtype).Debug("Provided")
}
if e.Err != nil {
l.WithError(e.Err).
Error("Error encountered while applying options")
}

case *fxevent.Decorated:
l := log.WithField("decorator", e.DecoratorName)
if len(e.ModuleName) != 0 {
l = l.WithField("module", e.ModuleName)
}
for _, rtype := range e.OutputTypeNames {
l.WithField("type", rtype).Debug("decorated")
}
if e.Err != nil {
l.WithError(e.Err).
Error("Error encountered while applying options")
}

case *fxevent.Invoking:
l := log.WithField("function", e.FunctionName)
if len(e.ModuleName) != 0 {
l = l.WithField("module", e.ModuleName)
}
l.Debug("Invoking")

case *fxevent.Invoked:
if e.Err != nil {
l := log.WithError(e.Err)
if len(e.ModuleName) != 0 {
l = l.WithField("module", e.ModuleName)
}
l.WithField("stack", e.Trace).
WithField("function", e.FunctionName).
Error("Invoke failed")
}

case *fxevent.Stopping:
log.WithField("signal", strings.ToUpper(e.Signal.String())).
Info("Stopping")

case *fxevent.Stopped:
if e.Err != nil {
log.WithError(e.Err).Error("Stop failed")
} else {
log.Info("Stopped")
}

case *fxevent.RollingBack:
log.WithError(e.StartErr).Error("Start failed, rolling back")

case *fxevent.RolledBack:
if e.Err != nil {
log.WithError(e.Err).Error("Rollback failed")
}

case *fxevent.Started:
if e.Err != nil {
log.WithError(e.Err).Error("Start failed")
} else {
log.Info("Started")
}

case *fxevent.LoggerInitialized:
if e.Err != nil {
log.WithError(e.Err).Error("Custom logger initialization failed")
} else {
log.WithField("function", e.ConstructorName).
Info("Initialized custom fxevent.Logger")
}
}
}
92 changes: 31 additions & 61 deletions daemon/cmd/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,46 @@ package cmd

import (
"context"
"os"
"os/signal"
"sync"

gops "github.com/google/gops/agent"
"golang.org/x/sys/unix"
"go.uber.org/fx"

"github.com/cilium/cilium/pkg/lock"
"github.com/cilium/cilium/pkg/pidfile"
)

var cleaner = &daemonCleanup{
cleanUPSig: make(chan struct{}),
cleanUPWg: &sync.WaitGroup{},
cleanupFuncs: &cleanupFuncList{
funcs: make([]func(), 0),
},
preCleanupFuncs: &cleanupFuncList{
funcs: make([]func(), 0),
},
// cleanerModule exposes the cleaner as a fx module and runs
// the cleanup functions on stop.
var cleanerModule = fx.Module(
"cleaner",
fx.Provide(newCleaner),
)

func newCleaner(lc fx.Lifecycle) *daemonCleanup {
cleaner := NewDaemonCleanup()
lc.Append(fx.Hook{
OnStop: func(context.Context) error {
cleaner.Clean()
return nil
},
})
return cleaner
}

type daemonCleanup struct {
lock.Mutex
// cleanUPSig channel that is closed when the daemon agent should be
// terminated.
cleanUPSig chan struct{}
// cleanUPWg all cleanup operations will be marked as Done() when completed.
cleanUPWg *sync.WaitGroup

preCleanupFuncs *cleanupFuncList
cleanupFuncs *cleanupFuncList
}

cleanupFuncs *cleanupFuncList

sigHandlerCancel context.CancelFunc
func NewDaemonCleanup() *daemonCleanup {
return &daemonCleanup{
cleanupFuncs: &cleanupFuncList{
funcs: make([]func(), 0),
},
preCleanupFuncs: &cleanupFuncList{
funcs: make([]func(), 0),
},
}
}

type cleanupFuncList struct {
Expand All @@ -61,46 +66,11 @@ func (c *cleanupFuncList) Run() {
}
}

func (d *daemonCleanup) registerSigHandler() <-chan struct{} {
sig := make(chan os.Signal, 1)
signal.Notify(sig, unix.SIGQUIT, unix.SIGINT, unix.SIGHUP, unix.SIGTERM)
interrupt := make(chan struct{})
go func() {
for s := range sig {
log.WithField("signal", s).Info("Exiting due to signal")
d.preCleanupFuncs.Run()
log.Debug("canceling context in signal handler")
d.Lock()
if d.sigHandlerCancel != nil {
d.sigHandlerCancel()
}
d.Unlock()
pidfile.Clean()
d.Clean()
d.cleanupFuncs.Run()
// nolint
break
}
close(interrupt)
}()
return interrupt
}

// Clean cleans up everything created by this package.
func (d *daemonCleanup) Clean() {
gops.Close()
close(d.cleanUPSig)
d.cleanUPWg.Wait()
}
d.preCleanupFuncs.Run()
pidfile.Clean()
d.cleanupFuncs.Run()

// SetCancelFunc sets the function which is called when we receive a signal to
// propagate cancelation down to ongoing operations. If it's already set,
// it does nothing.
func (d *daemonCleanup) SetCancelFunc(cfunc context.CancelFunc) {
d.Lock()
defer d.Unlock()
if d.sigHandlerCancel != nil {
return
}
d.sigHandlerCancel = cfunc
}
8 changes: 1 addition & 7 deletions daemon/cmd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ const (
// monitoring when a LXC starts.
type Daemon struct {
ctx context.Context
cancel context.CancelFunc
buildEndpointSem *semaphore.Weighted
l7Proxy *proxy.Proxy
svc *service.Service
Expand Down Expand Up @@ -363,11 +362,7 @@ func removeOldRouterState(ipv6 bool, restoredIP net.IP) error {
}

// NewDaemon creates and returns a new Daemon with the parameters set in c.
func NewDaemon(ctx context.Context, cancel context.CancelFunc, epMgr *endpointmanager.EndpointManager, dp datapath.Datapath) (*Daemon, *endpointRestoreState, error) {

// Pass the cancel to our signal handler directly so that it's canceled
// before we run the cleanup functions (see `cleanup.go` for implementation).
cleaner.SetCancelFunc(cancel)
func NewDaemon(ctx context.Context, cleaner *daemonCleanup, epMgr *endpointmanager.EndpointManager, dp datapath.Datapath) (*Daemon, *endpointRestoreState, error) {

var (
err error
Expand Down Expand Up @@ -503,7 +498,6 @@ func NewDaemon(ctx context.Context, cancel context.CancelFunc, epMgr *endpointma

d := Daemon{
ctx: ctx,
cancel: cancel,
prefixLengths: createPrefixLengthCounter(),
buildEndpointSem: semaphore.NewWeighted(int64(numWorkerThreads())),
compilationMutex: new(lock.RWMutex),
Expand Down
Loading

0 comments on commit cef7071

Please sign in to comment.