From 329a67047c4c6a25ca2b631de02e921cef2bc4b1 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Thu, 18 Aug 2022 19:35:09 +0800 Subject: [PATCH 1/6] Fix race condition on custom libbeat instrumentation --- internal/beater/beater.go | 34 ++++++++++++++++++++++++++++++---- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/internal/beater/beater.go b/internal/beater/beater.go index b3988bdef79..3ca60819bca 100644 --- a/internal/beater/beater.go +++ b/internal/beater/beater.go @@ -235,7 +235,7 @@ func (bt *beater) run(ctx context.Context, cancelContext context.CancelFunc, b * if b.Config != nil { reloader.outputConfig = b.Config.Output } - if err := reloader.reload(); err != nil { + if err := reloader.reloadOnce(); err != nil { return err } } @@ -289,6 +289,7 @@ func (r *reloader) Reload(configs []*reload.ConfigWithMeta) error { } r.mu.Lock() + defer r.mu.Unlock() r.rawConfig = integrationConfig.APMServer // Merge in datastream namespace passed in from apm integration if integrationConfig.DataStream != nil && integrationConfig.DataStream.Namespace != "" { @@ -300,7 +301,6 @@ func (r *reloader) Reload(configs []*reload.ConfigWithMeta) error { } } r.fleetConfig = &integrationConfig.Fleet - r.mu.Unlock() return r.reload() } @@ -312,14 +312,23 @@ func (r *reloader) reloadOutput(config *reload.ConfigWithMeta) error { } } r.mu.Lock() + defer r.mu.Unlock() r.outputConfig = outputConfig - r.mu.Unlock() return r.reload() } -func (r *reloader) reload() error { +func (r *reloader) reloadOnce() error { r.mu.Lock() defer r.mu.Unlock() + + return r.reload() +} + +// reload creates a new serverRunner, launches it in a new goroutine, waits +// for it to have successfully started and returns after waiting for the +// previous serverRunner to exit. Calls to reload must be sycnhronized +// explicitly by acquiring reloader#mu by callers. +func (r *reloader) reload() error { if r.rawConfig == nil { // APM Server config not loaded yet. return nil @@ -345,6 +354,17 @@ func (r *reloader) reload() error { r.args.Logger.Error(err) } }() + + // Wait for the new runner to start; this avoids the race condition in updating + // the monitoring#Deafult global registry inside the runner introduced due to two + // reloads (one for input and the other for output) + select { + case <-runner.done: + return errors.New("runner exited unexpectedly") + case <-runner.started: + // runner has started + } + // If the old runner exists, cancel it if r.runner != nil { r.runner.cancelRunServerContext() @@ -364,6 +384,7 @@ type serverRunner struct { // immediately when the Stop method is invoked. runServerContext context.Context cancelRunServerContext context.CancelFunc + started chan struct{} done chan struct{} pipeline beat.PipelineConnector @@ -416,6 +437,7 @@ func newServerRunner(ctx context.Context, args serverRunnerParams) (*serverRunne runServerContext: runServerContext, cancelRunServerContext: cancel, done: make(chan struct{}), + started: make(chan struct{}), config: cfg, rawConfig: args.RawConfig, @@ -581,6 +603,10 @@ func (s *serverRunner) run(listener net.Listener) error { NewElasticsearchClient: newElasticsearchClient, }) }) + + // Signal that the runner has started + close(s.started) + result := g.Wait() if err := closeFinalBatchProcessor(s.backgroundContext); err != nil { result = multierror.Append(result, err) From 92312d88207a2618d5b9b5e497a5f752a75b09bc Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Thu, 18 Aug 2022 19:54:53 +0800 Subject: [PATCH 2/6] Remove unnecessary locking --- internal/beater/beater.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/internal/beater/beater.go b/internal/beater/beater.go index 3ca60819bca..ff2aed80ce2 100644 --- a/internal/beater/beater.go +++ b/internal/beater/beater.go @@ -690,19 +690,12 @@ func (s *serverRunner) waitReady(ctx context.Context, kibanaClient kibana.Client return waitReady(ctx, s.config.WaitReadyInterval, s.tracer, s.logger, check) } -// This mutex must be held when updating the libbeat monitoring registry, -// as there may be multiple servers running concurrently. -var monitoringRegistryMu sync.Mutex - // newFinalBatchProcessor returns the final model.BatchProcessor that publishes events, // and a cleanup function which should be called on server shutdown. If the output is // "elasticsearch", then we use modelindexer; otherwise we use the libbeat publisher. func (s *serverRunner) newFinalBatchProcessor( newElasticsearchClient func(cfg *elasticsearch.Config) (elasticsearch.Client, error), ) (model.BatchProcessor, func(context.Context) error, error) { - monitoringRegistryMu.Lock() - defer monitoringRegistryMu.Unlock() - if s.elasticsearchOutputConfig == nil { // When the publisher stops cleanly it will close its pipeline client, // calling the acker's Close method. We need to call Open for each new From 780c15bd13b0c705648e2e3a949e6c366a92548d Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Thu, 18 Aug 2022 21:22:58 +0800 Subject: [PATCH 3/6] Update changelog --- changelogs/head.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/changelogs/head.asciidoc b/changelogs/head.asciidoc index 3128731ddbe..708d4c1e329 100644 --- a/changelogs/head.asciidoc +++ b/changelogs/head.asciidoc @@ -14,6 +14,7 @@ https://github.com/elastic/apm-server/compare/8.5\...main[View commits] ==== Bug fixes - Set `message` instead of `labels.event` for Jaeger span events {pull}8765[8765] - Fix event loss during reload of TBS processor {pull}8809[8809] +- Fix race condition on custom libbeat instrumentation {pull}8900[8900] [float] ==== Intake API Changes From a780cf36a2c776a4cbe9f533c8c6d107b64cd4de Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Thu, 18 Aug 2022 22:57:17 +0800 Subject: [PATCH 4/6] fix typo --- changelogs/head.asciidoc | 2 +- internal/beater/beater.go | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/changelogs/head.asciidoc b/changelogs/head.asciidoc index 708d4c1e329..81eb8866fed 100644 --- a/changelogs/head.asciidoc +++ b/changelogs/head.asciidoc @@ -14,7 +14,7 @@ https://github.com/elastic/apm-server/compare/8.5\...main[View commits] ==== Bug fixes - Set `message` instead of `labels.event` for Jaeger span events {pull}8765[8765] - Fix event loss during reload of TBS processor {pull}8809[8809] -- Fix race condition on custom libbeat instrumentation {pull}8900[8900] +- Fix race condition in custom libbeat instrumentation {pull}8900[8900] [float] ==== Intake API Changes diff --git a/internal/beater/beater.go b/internal/beater/beater.go index ff2aed80ce2..755f4689176 100644 --- a/internal/beater/beater.go +++ b/internal/beater/beater.go @@ -325,9 +325,9 @@ func (r *reloader) reloadOnce() error { } // reload creates a new serverRunner, launches it in a new goroutine, waits -// for it to have successfully started and returns after waiting for the -// previous serverRunner to exit. Calls to reload must be sycnhronized -// explicitly by acquiring reloader#mu by callers. +// for it to have successfully started and returns after waiting for the previous +// serverRunner (if any) to exit. Calls to reload must be sycnhronized explicitly +// by acquiring reloader#mu by callers. func (r *reloader) reload() error { if r.rawConfig == nil { // APM Server config not loaded yet. @@ -356,8 +356,8 @@ func (r *reloader) reload() error { }() // Wait for the new runner to start; this avoids the race condition in updating - // the monitoring#Deafult global registry inside the runner introduced due to two - // reloads (one for input and the other for output) + // the monitoring#Deafult global registry inside the runner due to two reloads, + // one for the inputs and the other for the elasticsearch output select { case <-runner.done: return errors.New("runner exited unexpectedly") From 19b79fe4a089aa43f14132b2e5fa863f243da234 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Thu, 18 Aug 2022 23:07:02 +0800 Subject: [PATCH 5/6] Update changelog entry --- changelogs/head.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelogs/head.asciidoc b/changelogs/head.asciidoc index 81eb8866fed..58fdee03cc3 100644 --- a/changelogs/head.asciidoc +++ b/changelogs/head.asciidoc @@ -14,7 +14,7 @@ https://github.com/elastic/apm-server/compare/8.5\...main[View commits] ==== Bug fixes - Set `message` instead of `labels.event` for Jaeger span events {pull}8765[8765] - Fix event loss during reload of TBS processor {pull}8809[8809] -- Fix race condition in custom libbeat instrumentation {pull}8900[8900] +- Fix sporadically missing custom libbeat metrics {pull}8900[8900] [float] ==== Intake API Changes From 160cdad9e2013147a01478e852ebd7d17bd45759 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Mon, 22 Aug 2022 11:03:54 +0800 Subject: [PATCH 6/6] Inline call to reload for standalone mode --- internal/beater/beater.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/internal/beater/beater.go b/internal/beater/beater.go index 755f4689176..5aa743e8a05 100644 --- a/internal/beater/beater.go +++ b/internal/beater/beater.go @@ -235,7 +235,10 @@ func (bt *beater) run(ctx context.Context, cancelContext context.CancelFunc, b * if b.Config != nil { reloader.outputConfig = b.Config.Output } - if err := reloader.reloadOnce(); err != nil { + reloader.mu.Lock() + err := reloader.reload() + reloader.mu.Unlock() + if err != nil { return err } } @@ -317,13 +320,6 @@ func (r *reloader) reloadOutput(config *reload.ConfigWithMeta) error { return r.reload() } -func (r *reloader) reloadOnce() error { - r.mu.Lock() - defer r.mu.Unlock() - - return r.reload() -} - // reload creates a new serverRunner, launches it in a new goroutine, waits // for it to have successfully started and returns after waiting for the previous // serverRunner (if any) to exit. Calls to reload must be sycnhronized explicitly