From d191a3b4f3fb3cfab8b6acfdffdb178bbf714304 Mon Sep 17 00:00:00 2001 From: Paschalis Tsilias Date: Mon, 1 Jul 2024 11:18:11 +0300 Subject: [PATCH 1/5] Fix race condition around supervisor's Commander Signed-off-by: Paschalis Tsilias --- .../supervisor/supervisor/commander/commander.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/internal/examples/supervisor/supervisor/commander/commander.go b/internal/examples/supervisor/supervisor/commander/commander.go index bd2f1f69..44a3064e 100644 --- a/internal/examples/supervisor/supervisor/commander/commander.go +++ b/internal/examples/supervisor/supervisor/commander/commander.go @@ -23,7 +23,7 @@ type Commander struct { cmd *exec.Cmd doneCh chan struct{} waitCh chan struct{} - running int64 + running atomic.Bool } func NewCommander(logger types.Logger, cfg *config.Agent, args ...string) (*Commander, error) { @@ -63,7 +63,7 @@ func (c *Commander) Start(ctx context.Context) error { } c.logger.Debugf(ctx, "Agent process started, PID=%d", c.cmd.Process.Pid) - atomic.StoreInt64(&c.running, 1) + c.running.Store(true) go c.watch() @@ -83,7 +83,7 @@ func (c *Commander) Restart(ctx context.Context) error { func (c *Commander) watch() { c.cmd.Wait() c.doneCh <- struct{}{} - atomic.StoreInt64(&c.running, 0) + c.running.Store(false) close(c.waitCh) } @@ -102,21 +102,21 @@ func (c *Commander) Pid() int { // ExitCode returns Agent process exit code if it exited or 0 if it is not. func (c *Commander) ExitCode() int { - if c.cmd == nil || c.cmd.ProcessState == nil { + if c.IsRunning() { return 0 } return c.cmd.ProcessState.ExitCode() } func (c *Commander) IsRunning() bool { - return atomic.LoadInt64(&c.running) != 0 + return c.running.Load() } // Stop the Agent process. Sends SIGTERM to the process and wait for up 10 seconds // and if the process does not finish kills it forcedly by sending SIGKILL. // Returns after the process is terminated. func (c *Commander) Stop(ctx context.Context) error { - if c.cmd == nil || c.cmd.Process == nil { + if !c.IsRunning() { // Not started, nothing to do. return nil } @@ -159,7 +159,7 @@ func (c *Commander) Stop(ctx context.Context) error { // Wait for process to terminate <-c.waitCh - atomic.StoreInt64(&c.running, 0) + c.running.Store(false) // Let goroutine know process is finished. close(finished) From b45a84fcba2293128b8bdae52a231163c34c0855 Mon Sep 17 00:00:00 2001 From: Paschalis Tsilias Date: Tue, 9 Jul 2024 18:58:36 +0300 Subject: [PATCH 2/5] Reuse new condition Signed-off-by: Paschalis Tsilias --- internal/examples/supervisor/supervisor/commander/commander.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/examples/supervisor/supervisor/commander/commander.go b/internal/examples/supervisor/supervisor/commander/commander.go index 44a3064e..8d982a15 100644 --- a/internal/examples/supervisor/supervisor/commander/commander.go +++ b/internal/examples/supervisor/supervisor/commander/commander.go @@ -94,7 +94,7 @@ func (c *Commander) Done() <-chan struct{} { // Pid returns Agent process PID if it is started or 0 if it is not. func (c *Commander) Pid() int { - if c.cmd == nil || c.cmd.Process == nil { + if !c.IsRunning() { return 0 } return c.cmd.Process.Pid From 1337ec44d3d33aea44dcc8435ae9a5ef9eecc74f Mon Sep 17 00:00:00 2001 From: Paschalis Tsilias Date: Wed, 14 Aug 2024 22:38:27 +0300 Subject: [PATCH 3/5] Synchronize stopping explicitly Signed-off-by: Paschalis Tsilias --- .../supervisor/commander/commander.go | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/internal/examples/supervisor/supervisor/commander/commander.go b/internal/examples/supervisor/supervisor/commander/commander.go index 8d982a15..1f6802a2 100644 --- a/internal/examples/supervisor/supervisor/commander/commander.go +++ b/internal/examples/supervisor/supervisor/commander/commander.go @@ -6,6 +6,7 @@ import ( "fmt" "os" "os/exec" + "sync" "sync/atomic" "syscall" "time" @@ -24,6 +25,10 @@ type Commander struct { doneCh chan struct{} waitCh chan struct{} running atomic.Bool + + // True when stopping is in progress. + isStoppingFlag bool + isStoppingMutex sync.RWMutex } func NewCommander(logger types.Logger, cfg *config.Agent, args ...string) (*Commander, error) { @@ -41,6 +46,10 @@ func NewCommander(logger types.Logger, cfg *config.Agent, args ...string) (*Comm // Start the Agent and begin watching the process. // Agent's stdout and stderr are written to a file. func (c *Commander) Start(ctx context.Context) error { + if c.IsStopping() { + return nil + } + c.logger.Debugf(ctx, "Starting agent %s", c.cfg.Executable) logFilePath := "agent.log" @@ -121,6 +130,10 @@ func (c *Commander) Stop(ctx context.Context) error { return nil } + c.isStoppingMutex.Lock() + c.isStoppingFlag = true + c.isStoppingMutex.Unlock() + c.logger.Debugf(ctx, "Stopping agent process, PID=%v", c.cmd.Process.Pid) // Gracefully signal process to stop. @@ -166,3 +179,10 @@ func (c *Commander) Stop(ctx context.Context) error { return innerErr } + +// IsStopping returns true if Stop() was called. +func (c *Commander) IsStopping() bool { + c.isStoppingMutex.RLock() + defer c.isStoppingMutex.RUnlock() + return c.isStoppingFlag +} From f1840f7e044686de6396473615c666f3aa78a422 Mon Sep 17 00:00:00 2001 From: Paschalis Tsilias Date: Fri, 30 Aug 2024 00:16:46 +0300 Subject: [PATCH 4/5] Set IsStopping flag first things first Signed-off-by: Paschalis Tsilias --- .../supervisor/supervisor/commander/commander.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/internal/examples/supervisor/supervisor/commander/commander.go b/internal/examples/supervisor/supervisor/commander/commander.go index 1f6802a2..13fd7333 100644 --- a/internal/examples/supervisor/supervisor/commander/commander.go +++ b/internal/examples/supervisor/supervisor/commander/commander.go @@ -125,15 +125,14 @@ func (c *Commander) IsRunning() bool { // and if the process does not finish kills it forcedly by sending SIGKILL. // Returns after the process is terminated. func (c *Commander) Stop(ctx context.Context) error { - if !c.IsRunning() { - // Not started, nothing to do. - return nil - } - c.isStoppingMutex.Lock() c.isStoppingFlag = true c.isStoppingMutex.Unlock() + if !c.IsRunning() { + // Not started, nothing to do. + return nil + } c.logger.Debugf(ctx, "Stopping agent process, PID=%v", c.cmd.Process.Pid) // Gracefully signal process to stop. From f135e3b0ef6c4282388497bb9df315973f7e6120 Mon Sep 17 00:00:00 2001 From: Paschalis Tsilias Date: Fri, 30 Aug 2024 00:50:23 +0300 Subject: [PATCH 5/5] Add test for verifying proper shutdown Signed-off-by: Paschalis Tsilias --- .../supervisor/supervisor/supervisor_test.go | 33 ++++++++++++++++++- internal/noplogger.go | 10 ++++++ 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/internal/examples/supervisor/supervisor/supervisor_test.go b/internal/examples/supervisor/supervisor/supervisor_test.go index 45687201..fbadeea7 100644 --- a/internal/examples/supervisor/supervisor/supervisor_test.go +++ b/internal/examples/supervisor/supervisor/supervisor_test.go @@ -4,7 +4,8 @@ import ( "fmt" "os" "testing" - + "time" + "github.com/stretchr/testify/assert" "github.com/open-telemetry/opamp-go/internal" @@ -62,3 +63,33 @@ agent: supervisor.Shutdown() } + +func TestShutdownRaceCondition(t *testing.T) { + tmpDir := changeCurrentDir(t) + os.WriteFile("supervisor.yaml", []byte(fmt.Sprintf(` +server: + endpoint: ws://127.0.0.1:4320/v1/opamp +agent: + executable: %s/dummy_agent.sh`, tmpDir)), 0644) + + os.WriteFile("dummy_agent.sh", []byte("#!/bin/sh\nsleep 9999\n"), 0755) + + startOpampServer(t) + + // There's no great way to ensure Shutdown gets called before Start. + // The DelayLogger ensures some delay before the goroutine gets started. + var supervisor *Supervisor + var err error + supervisor, err = NewSupervisor(&internal.DelayLogger{}) + supervisor.Shutdown() + supervisor.hasNewConfig <- struct{}{} + + assert.NoError(t, err) + + // The Shutdown method has been called before the runAgentProcess goroutine + // gets started and has a chance to load a new process. Make sure no PID + // has been launched. + assert.Never(t, func() bool { + return supervisor.commander.Pid() != 0 + }, 2*time.Second, 10*time.Millisecond) +} diff --git a/internal/noplogger.go b/internal/noplogger.go index a2b2ea27..9807e6e6 100644 --- a/internal/noplogger.go +++ b/internal/noplogger.go @@ -2,6 +2,7 @@ package internal import ( "context" + "time" "github.com/open-telemetry/opamp-go/client/types" ) @@ -12,3 +13,12 @@ type NopLogger struct{} func (l *NopLogger) Debugf(ctx context.Context, format string, v ...interface{}) {} func (l *NopLogger) Errorf(ctx context.Context, format string, v ...interface{}) {} + +type DelayLogger struct{} + +func (l *DelayLogger) Debugf(ctx context.Context, format string, v ...interface{}) { + time.Sleep(10 * time.Millisecond) +} +func (l *DelayLogger) Errorf(ctx context.Context, format string, v ...interface{}) { + time.Sleep(10 * time.Millisecond) +}