Skip to content

Commit

Permalink
Add a subscriber file mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
carsonip committed Oct 16, 2024
1 parent 25e5b5d commit f654fba
Showing 1 changed file with 12 additions and 1 deletion.
13 changes: 12 additions & 1 deletion x-pack/apm-server/sampling/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,9 @@ func (p *Processor) Run() error {
})
g.Go(func() error {
// Subscribe to remotely sampled trace IDs. This is cancelled immediately when
// Stop is called. The next subscriber will pick up from the previous position.
// Stop is called. But it is possible that both old and new subscriber goroutines
// run concurrently, before the old one eventually receives the Stop call.
// The next subscriber will pick up from the previous position.
defer close(remoteSampledTraceIDs)
defer close(subscriberPositions)
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -565,7 +567,13 @@ func (p *Processor) Run() error {
return nil
}

// subscriberPositionFileMutex protects the subscriber file from concurrent RW, in case of hot reload.
var subscriberPositionFileMutex sync.Mutex

func readSubscriberPosition(logger *logp.Logger, storageDir string) (pubsub.SubscriberPosition, error) {
subscriberPositionFileMutex.Lock()
defer subscriberPositionFileMutex.Unlock()

var pos pubsub.SubscriberPosition
data, err := os.ReadFile(filepath.Join(storageDir, subscriberPositionFile))
if errors.Is(err, os.ErrNotExist) {
Expand All @@ -586,6 +594,9 @@ func writeSubscriberPosition(storageDir string, pos pubsub.SubscriberPosition) e
if err != nil {
return err
}

subscriberPositionFileMutex.Lock()
defer subscriberPositionFileMutex.Unlock()
return os.WriteFile(filepath.Join(storageDir, subscriberPositionFile), data, 0644)
}

Expand Down

0 comments on commit f654fba

Please sign in to comment.