-
Notifications
You must be signed in to change notification settings - Fork 529
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Limit badger gc concurrency to 1 to avoid panic #14340
Changes from all commits
eb9f610
503f8cc
b18d34e
25e5b5d
f654fba
038ff5b
4a04fc8
b744adf
174df6d
d6a23ad
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -40,6 +40,11 @@ const ( | |
shutdownGracePeriod = 5 * time.Second | ||
) | ||
|
||
var ( | ||
// gcCh works like a global mutex to protect gc from running concurrently when 2 TBS processors are active during a hot reload | ||
gcCh = make(chan struct{}, 1) | ||
) | ||
|
||
// Processor is a tail-sampling event processor. | ||
type Processor struct { | ||
config Config | ||
|
@@ -386,6 +391,16 @@ func (p *Processor) Run() error { | |
} | ||
}) | ||
g.Go(func() error { | ||
// Protect this goroutine from running concurrently when 2 TBS processors are active | ||
// as badger GC is not concurrent safe. | ||
select { | ||
case <-p.stopping: | ||
return nil | ||
case gcCh <- struct{}{}: | ||
} | ||
defer func() { | ||
<-gcCh | ||
}() | ||
// This goroutine is responsible for periodically garbage | ||
// collecting the Badger value log, using the recommended | ||
// discard ratio of 0.5. | ||
|
@@ -411,7 +426,9 @@ func (p *Processor) Run() error { | |
}) | ||
g.Go(func() error { | ||
// Subscribe to remotely sampled trace IDs. This is cancelled immediately when | ||
// Stop is called. The next subscriber will pick up from the previous position. | ||
// Stop is called. But it is possible that both old and new subscriber goroutines | ||
// run concurrently, before the old one eventually receives the Stop call. | ||
// The next subscriber will pick up from the previous position. | ||
defer close(remoteSampledTraceIDs) | ||
defer close(subscriberPositions) | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
|
@@ -558,7 +575,13 @@ func (p *Processor) Run() error { | |
return nil | ||
} | ||
|
||
// subscriberPositionFileMutex protects the subscriber file from concurrent RW, in case of hot reload. | ||
var subscriberPositionFileMutex sync.Mutex | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [to reviewer] This ended up as a mutex over the subscriber file only, but not the subscriber goroutine. Although this means possibly duplicate work (e.g. searching in ES) during the overlap, any position written to subscriber file is a position that is processed. Running 2 subscriber goroutines concurrently does not present any correctness issues. |
||
|
||
func readSubscriberPosition(logger *logp.Logger, storageDir string) (pubsub.SubscriberPosition, error) { | ||
subscriberPositionFileMutex.Lock() | ||
defer subscriberPositionFileMutex.Unlock() | ||
|
||
var pos pubsub.SubscriberPosition | ||
data, err := os.ReadFile(filepath.Join(storageDir, subscriberPositionFile)) | ||
if errors.Is(err, os.ErrNotExist) { | ||
|
@@ -579,6 +602,9 @@ func writeSubscriberPosition(storageDir string, pos pubsub.SubscriberPosition) e | |
if err != nil { | ||
return err | ||
} | ||
|
||
subscriberPositionFileMutex.Lock() | ||
defer subscriberPositionFileMutex.Unlock() | ||
return os.WriteFile(filepath.Join(storageDir, subscriberPositionFile), data, 0644) | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,7 @@ import ( | |
"github.com/pkg/errors" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
"golang.org/x/sync/errgroup" | ||
"google.golang.org/protobuf/testing/protocmp" | ||
|
||
"github.com/elastic/apm-data/model/modelpb" | ||
|
@@ -668,6 +669,31 @@ func TestStorageGC(t *testing.T) { | |
t.Fatal("timed out waiting for value log garbage collection") | ||
} | ||
|
||
func TestStorageGCConcurrency(t *testing.T) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [to reviewer] added this test to reproduce the issue. I don't know how to better reproduce it in a simpler way other than setting a short GC interval and sleeping for a second. |
||
// This test ensures that TBS processor does not return an error | ||
// even when run concurrently e.g. in hot reload | ||
if testing.Short() { | ||
t.Skip("skipping slow test") | ||
} | ||
|
||
config := newTempdirConfig(t) | ||
config.TTL = 10 * time.Millisecond | ||
config.FlushInterval = 10 * time.Millisecond | ||
config.StorageGCInterval = 10 * time.Millisecond | ||
|
||
g := errgroup.Group{} | ||
for i := 0; i < 2; i++ { | ||
processor, err := sampling.NewProcessor(config) | ||
require.NoError(t, err) | ||
g.Go(processor.Run) | ||
go func() { | ||
time.Sleep(time.Second) | ||
assert.NoError(t, processor.Stop(context.Background())) | ||
}() | ||
} | ||
assert.NoError(t, g.Wait()) | ||
} | ||
|
||
func TestStorageLimit(t *testing.T) { | ||
// This test ensures that when tail sampling is configured with a hard | ||
// storage limit, the limit is respected once the size is available. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[to reviewer] Used a channel here instead of a sync.Mutex, just to avoid blocking the goroutine shutdown in case Stop is called. I cannot imagine a case where mu.Lock() will block the shutdown, but just to err on the safe side.