From eb9f610997a58b82c36a3729139de4514f80ed8a Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Fri, 11 Oct 2024 11:26:25 +0100 Subject: [PATCH 1/7] Add mutex to avoid concurrent gc during hot reload Fixes #14305 --- x-pack/apm-server/sampling/processor.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/x-pack/apm-server/sampling/processor.go b/x-pack/apm-server/sampling/processor.go index 82dc2df59aa..4f5731be56a 100644 --- a/x-pack/apm-server/sampling/processor.go +++ b/x-pack/apm-server/sampling/processor.go @@ -40,6 +40,8 @@ const ( shutdownGracePeriod = 5 * time.Second ) +var gcMutex sync.Mutex // global mutex to protect gc from running concurrently in a hot reload + // Processor is a tail-sampling event processor. type Processor struct { config Config @@ -386,6 +388,8 @@ func (p *Processor) Run() error { } }) g.Go(func() error { + gcMutex.Lock() + defer gcMutex.Unlock() // This goroutine is responsible for periodically garbage // collecting the Badger value log, using the recommended // discard ratio of 0.5. From 503f8ccf3b4c2fb4858f2ff51710884fa5938eb2 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Fri, 11 Oct 2024 11:58:11 +0100 Subject: [PATCH 2/7] Add changelog --- changelogs/head.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/changelogs/head.asciidoc b/changelogs/head.asciidoc index ab1a5e60bcd..f0ca48b4a1c 100644 --- a/changelogs/head.asciidoc +++ b/changelogs/head.asciidoc @@ -7,6 +7,7 @@ https://github.com/elastic/apm-server/compare/8.15\...main[View commits] ==== Bug fixes - Track all bulk request response status codes {pull}13574[13574] +- Tail-based sampling: Fix rare gc thread failure after EA hot reload, causing unbounded storage size growth {pull}13574[13574] [float] ==== Breaking Changes From b18d34eb26d971df93e03dd94cf188bb32347ceb Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Fri, 11 Oct 2024 11:59:19 +0100 Subject: [PATCH 3/7] Improve comment --- x-pack/apm-server/sampling/processor.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/x-pack/apm-server/sampling/processor.go b/x-pack/apm-server/sampling/processor.go index 4f5731be56a..2888d1df6e1 100644 --- a/x-pack/apm-server/sampling/processor.go +++ b/x-pack/apm-server/sampling/processor.go @@ -40,7 +40,10 @@ const ( shutdownGracePeriod = 5 * time.Second ) -var gcMutex sync.Mutex // global mutex to protect gc from running concurrently in a hot reload +var ( + // gcMutex is a global mutex to protect gc from running concurrently when 2 TBS processors are active during a hot reload + gcMutex sync.Mutex +) // Processor is a tail-sampling event processor. type Processor struct { From 25e5b5de4383e52309b2f783747e96df5abe28c7 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Fri, 11 Oct 2024 12:14:05 +0100 Subject: [PATCH 4/7] Update changelog --- changelogs/head.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelogs/head.asciidoc b/changelogs/head.asciidoc index f0ca48b4a1c..8d09d240906 100644 --- a/changelogs/head.asciidoc +++ b/changelogs/head.asciidoc @@ -7,7 +7,7 @@ https://github.com/elastic/apm-server/compare/8.15\...main[View commits] ==== Bug fixes - Track all bulk request response status codes {pull}13574[13574] -- Tail-based sampling: Fix rare gc thread failure after EA hot reload, causing unbounded storage size growth {pull}13574[13574] +- Tail-based sampling: Fix rare gc thread failure after EA hot reload causing storage not reclaimed and stuck with "storage limit reached" {pull}13574[13574] [float] ==== Breaking Changes From f654fba6426da82a38b63d84d80ae90e129aca57 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Wed, 16 Oct 2024 17:08:00 +0100 Subject: [PATCH 5/7] Add a subscriber file mutex --- x-pack/apm-server/sampling/processor.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/x-pack/apm-server/sampling/processor.go b/x-pack/apm-server/sampling/processor.go index 2888d1df6e1..68d72d93925 100644 --- a/x-pack/apm-server/sampling/processor.go +++ b/x-pack/apm-server/sampling/processor.go @@ -418,7 +418,9 @@ func (p *Processor) Run() error { }) g.Go(func() error { // Subscribe to remotely sampled trace IDs. This is cancelled immediately when - // Stop is called. The next subscriber will pick up from the previous position. + // Stop is called. But it is possible that both old and new subscriber goroutines + // run concurrently, before the old one eventually receives the Stop call. + // The next subscriber will pick up from the previous position. defer close(remoteSampledTraceIDs) defer close(subscriberPositions) ctx, cancel := context.WithCancel(context.Background()) @@ -565,7 +567,13 @@ func (p *Processor) Run() error { return nil } +// subscriberPositionFileMutex protects the subscriber file from concurrent RW, in case of hot reload. +var subscriberPositionFileMutex sync.Mutex + func readSubscriberPosition(logger *logp.Logger, storageDir string) (pubsub.SubscriberPosition, error) { + subscriberPositionFileMutex.Lock() + defer subscriberPositionFileMutex.Unlock() + var pos pubsub.SubscriberPosition data, err := os.ReadFile(filepath.Join(storageDir, subscriberPositionFile)) if errors.Is(err, os.ErrNotExist) { @@ -586,6 +594,9 @@ func writeSubscriberPosition(storageDir string, pos pubsub.SubscriberPosition) e if err != nil { return err } + + subscriberPositionFileMutex.Lock() + defer subscriberPositionFileMutex.Unlock() return os.WriteFile(filepath.Join(storageDir, subscriberPositionFile), data, 0644) } From 038ff5be2e25314d43aa5cd2b67f204383ad3820 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Wed, 16 Oct 2024 17:15:15 +0100 Subject: [PATCH 6/7] Use channel instead of mutex --- x-pack/apm-server/sampling/processor.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/x-pack/apm-server/sampling/processor.go b/x-pack/apm-server/sampling/processor.go index 68d72d93925..9402408b299 100644 --- a/x-pack/apm-server/sampling/processor.go +++ b/x-pack/apm-server/sampling/processor.go @@ -41,8 +41,8 @@ const ( ) var ( - // gcMutex is a global mutex to protect gc from running concurrently when 2 TBS processors are active during a hot reload - gcMutex sync.Mutex + // gcCh works like a global mutex to protect gc from running concurrently when 2 TBS processors are active during a hot reload + gcCh = make(chan struct{}, 1) ) // Processor is a tail-sampling event processor. @@ -391,8 +391,16 @@ func (p *Processor) Run() error { } }) g.Go(func() error { - gcMutex.Lock() - defer gcMutex.Unlock() + // Protect this goroutine from running concurrently when 2 TBS processors are active + // as badger GC is not concurrent safe. + select { + case <-p.stopping: + return nil + case gcCh <- struct{}{}: + } + defer func() { + <-gcCh + }() // This goroutine is responsible for periodically garbage // collecting the Badger value log, using the recommended // discard ratio of 0.5. From b744adf3c56a93ae297ddb2922d90ed3698ffd38 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Thu, 17 Oct 2024 17:25:19 +0100 Subject: [PATCH 7/7] Add test --- x-pack/apm-server/sampling/processor_test.go | 26 ++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/x-pack/apm-server/sampling/processor_test.go b/x-pack/apm-server/sampling/processor_test.go index f17500da501..e0bf38f77b2 100644 --- a/x-pack/apm-server/sampling/processor_test.go +++ b/x-pack/apm-server/sampling/processor_test.go @@ -22,6 +22,7 @@ import ( "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" "google.golang.org/protobuf/testing/protocmp" "github.com/elastic/apm-data/model/modelpb" @@ -668,6 +669,31 @@ func TestStorageGC(t *testing.T) { t.Fatal("timed out waiting for value log garbage collection") } +func TestStorageGCConcurrency(t *testing.T) { + // This test ensures that TBS processor does not return an error + // even when run concurrently e.g. in hot reload + if testing.Short() { + t.Skip("skipping slow test") + } + + config := newTempdirConfig(t) + config.TTL = 10 * time.Millisecond + config.FlushInterval = 10 * time.Millisecond + config.StorageGCInterval = 10 * time.Millisecond + + g := errgroup.Group{} + for i := 0; i < 2; i++ { + processor, err := sampling.NewProcessor(config) + require.NoError(t, err) + g.Go(processor.Run) + go func() { + time.Sleep(time.Second) + assert.NoError(t, processor.Stop(context.Background())) + }() + } + assert.NoError(t, g.Wait()) +} + func TestStorageLimit(t *testing.T) { // This test ensures that when tail sampling is configured with a hard // storage limit, the limit is respected once the size is available.