-
Notifications
You must be signed in to change notification settings - Fork 528
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix race condition in custom libbeat instrumentation #8900
Changes from all commits
329a670
92312d8
780c15b
a780cf3
19b79fe
160cdad
d358eab
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -235,7 +235,10 @@ func (bt *beater) run(ctx context.Context, cancelContext context.CancelFunc, b * | |
if b.Config != nil { | ||
reloader.outputConfig = b.Config.Output | ||
} | ||
if err := reloader.reload(); err != nil { | ||
reloader.mu.Lock() | ||
err := reloader.reload() | ||
reloader.mu.Unlock() | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
|
@@ -289,6 +292,7 @@ func (r *reloader) Reload(configs []*reload.ConfigWithMeta) error { | |
} | ||
|
||
r.mu.Lock() | ||
defer r.mu.Unlock() | ||
r.rawConfig = integrationConfig.APMServer | ||
// Merge in datastream namespace passed in from apm integration | ||
if integrationConfig.DataStream != nil && integrationConfig.DataStream.Namespace != "" { | ||
|
@@ -300,7 +304,6 @@ func (r *reloader) Reload(configs []*reload.ConfigWithMeta) error { | |
} | ||
} | ||
r.fleetConfig = &integrationConfig.Fleet | ||
r.mu.Unlock() | ||
return r.reload() | ||
} | ||
|
||
|
@@ -312,14 +315,16 @@ func (r *reloader) reloadOutput(config *reload.ConfigWithMeta) error { | |
} | ||
} | ||
r.mu.Lock() | ||
defer r.mu.Unlock() | ||
r.outputConfig = outputConfig | ||
r.mu.Unlock() | ||
return r.reload() | ||
} | ||
|
||
// reload creates a new serverRunner, launches it in a new goroutine, waits | ||
// for it to have successfully started and returns after waiting for the previous | ||
// serverRunner (if any) to exit. Calls to reload must be sycnhronized explicitly | ||
// by acquiring reloader#mu by callers. | ||
func (r *reloader) reload() error { | ||
r.mu.Lock() | ||
defer r.mu.Unlock() | ||
if r.rawConfig == nil { | ||
// APM Server config not loaded yet. | ||
return nil | ||
|
@@ -345,6 +350,17 @@ func (r *reloader) reload() error { | |
r.args.Logger.Error(err) | ||
} | ||
}() | ||
|
||
// Wait for the new runner to start; this avoids the race condition in updating | ||
// the monitoring#Deafult global registry inside the runner due to two reloads, | ||
// one for the inputs and the other for the elasticsearch output | ||
select { | ||
case <-runner.done: | ||
return errors.New("runner exited unexpectedly") | ||
case <-runner.started: | ||
// runner has started | ||
} | ||
|
||
// If the old runner exists, cancel it | ||
if r.runner != nil { | ||
r.runner.cancelRunServerContext() | ||
|
@@ -364,6 +380,7 @@ type serverRunner struct { | |
// immediately when the Stop method is invoked. | ||
runServerContext context.Context | ||
cancelRunServerContext context.CancelFunc | ||
started chan struct{} | ||
done chan struct{} | ||
|
||
pipeline beat.PipelineConnector | ||
|
@@ -416,6 +433,7 @@ func newServerRunner(ctx context.Context, args serverRunnerParams) (*serverRunne | |
runServerContext: runServerContext, | ||
cancelRunServerContext: cancel, | ||
done: make(chan struct{}), | ||
started: make(chan struct{}), | ||
|
||
config: cfg, | ||
rawConfig: args.RawConfig, | ||
|
@@ -581,6 +599,10 @@ func (s *serverRunner) run(listener net.Listener) error { | |
NewElasticsearchClient: newElasticsearchClient, | ||
}) | ||
}) | ||
|
||
// Signal that the runner has started | ||
close(s.started) | ||
|
||
result := g.Wait() | ||
if err := closeFinalBatchProcessor(s.backgroundContext); err != nil { | ||
result = multierror.Append(result, err) | ||
|
@@ -664,19 +686,12 @@ func (s *serverRunner) waitReady(ctx context.Context, kibanaClient kibana.Client | |
return waitReady(ctx, s.config.WaitReadyInterval, s.tracer, s.logger, check) | ||
} | ||
|
||
// This mutex must be held when updating the libbeat monitoring registry, | ||
// as there may be multiple servers running concurrently. | ||
var monitoringRegistryMu sync.Mutex | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [For reviewers] IIUC, this mutex is not required. Earlier the mutex was preventing the default monitoring registry to be accessed concurrently however, that was not enough to prevent race as a temporary reload's The current PR makes sure that |
||
|
||
// newFinalBatchProcessor returns the final model.BatchProcessor that publishes events, | ||
// and a cleanup function which should be called on server shutdown. If the output is | ||
// "elasticsearch", then we use modelindexer; otherwise we use the libbeat publisher. | ||
func (s *serverRunner) newFinalBatchProcessor( | ||
newElasticsearchClient func(cfg *elasticsearch.Config) (elasticsearch.Client, error), | ||
) (model.BatchProcessor, func(context.Context) error, error) { | ||
monitoringRegistryMu.Lock() | ||
defer monitoringRegistryMu.Unlock() | ||
|
||
if s.elasticsearchOutputConfig == nil { | ||
// When the publisher stops cleanly it will close its pipeline client, | ||
// calling the acker's Close method. We need to call Open for each new | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch and great solution!