Skip to content

Commit

Permalink
fix: always use background context to update metrics (#15516)
Browse files Browse the repository at this point in the history
propagating the request context means the metric might not
be updated if the context is done
  • Loading branch information
kruskall authored Jan 31, 2025
1 parent 826c2bb commit b62fd62
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 43 deletions.
14 changes: 7 additions & 7 deletions internal/agentcfg/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,22 +125,22 @@ func (f *ElasticsearchFetcher) Fetch(ctx context.Context, query Query) (Result,
// Happy path: serve fetch requests using an initialized cache.
f.mu.RLock()
defer f.mu.RUnlock()
f.esFetchCount.Add(ctx, 1)
f.esFetchCount.Add(context.Background(), 1)
return matchAgentConfig(query, f.cache), nil
}

if f.fallbackFetcher != nil {
f.esFetchFallbackCount.Add(ctx, 1)
f.esFetchFallbackCount.Add(context.Background(), 1)
return f.fallbackFetcher.Fetch(ctx, query)
}

if f.invalidESCfg.Load() {
f.esFetchInvalidCount.Add(ctx, 1)
f.esFetchInvalidCount.Add(context.Background(), 1)
f.rateLimitedLogger.Errorf("rejecting fetch request: no valid elasticsearch config")
return Result{}, errors.New(ErrNoValidElasticsearchConfig)
}

f.esFetchUnavailableCount.Add(ctx, 1)
f.esFetchUnavailableCount.Add(context.Background(), 1)
f.rateLimitedLogger.Warnf("rejecting fetch request: infrastructure is not ready")
return Result{}, errors.New(ErrInfrastructureNotReady)
}
Expand Down Expand Up @@ -233,9 +233,9 @@ func (f *ElasticsearchFetcher) refreshCache(ctx context.Context) (err error) {

defer func() {
if err != nil {
f.esCacheRefreshFailures.Add(ctx, 1)
f.esCacheRefreshFailures.Add(context.Background(), 1)
} else {
f.esCacheRefreshSuccesses.Add(ctx, 1)
f.esCacheRefreshSuccesses.Add(context.Background(), 1)
}
}()

Expand Down Expand Up @@ -267,7 +267,7 @@ func (f *ElasticsearchFetcher) refreshCache(ctx context.Context) (err error) {
f.cache = buffer
f.mu.Unlock()
f.cacheInitialized.Store(true)
f.esCacheEntriesCount.Record(ctx, int64(len(f.cache)))
f.esCacheEntriesCount.Record(context.Background(), int64(len(f.cache)))
f.last = time.Now()
return nil
}
Expand Down
20 changes: 10 additions & 10 deletions internal/beater/interceptors/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,36 +65,36 @@ func (m *metricsInterceptor) Interceptor() grpc.UnaryServerInterceptor {
return handler(ctx, req)
}

m.inc(ctx, legacyMetricsPrefix, request.IDRequestCount)
defer m.inc(ctx, legacyMetricsPrefix, request.IDResponseCount)
m.inc(legacyMetricsPrefix, request.IDRequestCount)
defer m.inc(legacyMetricsPrefix, request.IDResponseCount)

start := time.Now()
resp, err := handler(ctx, req)
duration := time.Since(start)
m.getHistogram(requestDurationHistogram, metric.WithUnit("ms")).Record(ctx, duration.Milliseconds())
m.getHistogram(requestDurationHistogram, metric.WithUnit("ms")).Record(context.Background(), duration.Milliseconds())

responseID := request.IDResponseValidCount
if err != nil {
responseID = request.IDResponseErrorsCount
if s, ok := status.FromError(err); ok {
switch s.Code() {
case codes.Unauthenticated:
m.inc(ctx, legacyMetricsPrefix, request.IDResponseErrorsUnauthorized)
m.inc(legacyMetricsPrefix, request.IDResponseErrorsUnauthorized)
case codes.DeadlineExceeded, codes.Canceled:
m.inc(ctx, legacyMetricsPrefix, request.IDResponseErrorsTimeout)
m.inc(legacyMetricsPrefix, request.IDResponseErrorsTimeout)
case codes.ResourceExhausted:
m.inc(ctx, legacyMetricsPrefix, request.IDResponseErrorsRateLimit)
m.inc(legacyMetricsPrefix, request.IDResponseErrorsRateLimit)
}
}
}
m.inc(ctx, legacyMetricsPrefix, responseID)
m.inc(legacyMetricsPrefix, responseID)
return resp, err
}
}

func (m *metricsInterceptor) inc(ctx context.Context, legacyMetricsPrefix string, id request.ResultID) {
m.getCounter("grpc.server.", string(id)).Add(ctx, 1)
m.getCounter(legacyMetricsPrefix, string(id)).Add(ctx, 1)
func (m *metricsInterceptor) inc(legacyMetricsPrefix string, id request.ResultID) {
m.getCounter("grpc.server.", string(id)).Add(context.Background(), 1)
m.getCounter(legacyMetricsPrefix, string(id)).Add(context.Background(), 1)
}

func (m *metricsInterceptor) getCounter(prefix, n string) metric.Int64Counter {
Expand Down
20 changes: 9 additions & 11 deletions internal/beater/middleware/monitoring_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,30 +43,28 @@ type monitoringMiddleware struct {
func (m *monitoringMiddleware) Middleware() Middleware {
return func(h request.Handler) (request.Handler, error) {
return func(c *request.Context) {
ctx := context.Background()

m.inc(ctx, request.IDRequestCount)
m.inc(request.IDRequestCount)

start := time.Now()
h(c)
duration := time.Since(start)
m.getHistogram(requestDurationHistogram, metric.WithUnit("ms")).Record(ctx, duration.Milliseconds())
m.getHistogram(requestDurationHistogram, metric.WithUnit("ms")).Record(context.Background(), duration.Milliseconds())

m.inc(ctx, request.IDResponseCount)
m.inc(request.IDResponseCount)
if c.Result.StatusCode >= http.StatusBadRequest {
m.inc(ctx, request.IDResponseErrorsCount)
m.inc(request.IDResponseErrorsCount)
} else {
m.inc(ctx, request.IDResponseValidCount)
m.inc(request.IDResponseValidCount)
}
m.inc(ctx, c.Result.ID)
m.inc(c.Result.ID)
}, nil

}
}

func (m *monitoringMiddleware) inc(ctx context.Context, id request.ResultID) {
m.getCounter("http.server.", string(id)).Add(ctx, 1)
m.getCounter(m.legacyMetricsPrefix, string(id)).Add(ctx, 1)
func (m *monitoringMiddleware) inc(id request.ResultID) {
m.getCounter("http.server.", string(id)).Add(context.Background(), 1)
m.getCounter(m.legacyMetricsPrefix, string(id)).Add(context.Background(), 1)
}

func (m *monitoringMiddleware) getCounter(prefix, name string) metric.Int64Counter {
Expand Down
2 changes: 1 addition & 1 deletion internal/model/modelprocessor/eventcounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func NewEventCounter(mp metric.MeterProvider) *EventCounter {
// ProcessBatch counts events in b, grouping by APMEvent.Processor.Event.
func (c *EventCounter) ProcessBatch(ctx context.Context, b *modelpb.Batch) error {
for _, event := range *b {
c.eventCounters[event.Type()].Add(ctx, 1)
c.eventCounters[event.Type()].Add(context.Background(), 1)
}
return nil
}
28 changes: 14 additions & 14 deletions x-pack/apm-server/sampling/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,11 @@ func (p *Processor) ProcessBatch(ctx context.Context, batch *modelpb.Batch) erro
var err error
switch event.Type() {
case modelpb.TransactionEventType:
p.eventMetrics.processed.Add(ctx, 1)
report, stored, err = p.processTransaction(ctx, event)
p.eventMetrics.processed.Add(context.Background(), 1)
report, stored, err = p.processTransaction(event)
case modelpb.SpanEventType:
p.eventMetrics.processed.Add(ctx, 1)
report, stored, err = p.processSpan(ctx, event)
p.eventMetrics.processed.Add(context.Background(), 1)
report, stored, err = p.processSpan(event)
default:
continue
}
Expand All @@ -136,18 +136,18 @@ func (p *Processor) ProcessBatch(ctx context.Context, batch *modelpb.Batch) erro
i--
}

p.updateProcessorMetrics(ctx, report, stored, failed)
p.updateProcessorMetrics(report, stored, failed)
}
*batch = events
return nil
}

func (p *Processor) updateProcessorMetrics(ctx context.Context, report, stored, failedWrite bool) {
func (p *Processor) updateProcessorMetrics(report, stored, failedWrite bool) {
if failedWrite {
p.eventMetrics.failedWrites.Add(ctx, 1)
p.eventMetrics.failedWrites.Add(context.Background(), 1)
}
if stored {
p.eventMetrics.stored.Add(ctx, 1)
p.eventMetrics.stored.Add(context.Background(), 1)
} else if !report {
// We only increment the "dropped" counter if
// we neither reported nor stored the event, so
Expand All @@ -158,15 +158,15 @@ func (p *Processor) updateProcessorMetrics(ctx context.Context, report, stored,
// The counter does not include events that are
// implicitly dropped, i.e. stored and never
// indexed.
p.eventMetrics.dropped.Add(ctx, 1)
p.eventMetrics.dropped.Add(context.Background(), 1)
}
}

func (p *Processor) processTransaction(ctx context.Context, event *modelpb.APMEvent) (report, stored bool, _ error) {
func (p *Processor) processTransaction(event *modelpb.APMEvent) (report, stored bool, _ error) {
if !event.Transaction.Sampled {
// (Head-based) unsampled transactions are passed through
// by the tail sampler.
p.eventMetrics.headUnsampled.Add(ctx, 1)
p.eventMetrics.headUnsampled.Add(context.Background(), 1)
return true, false, nil
}

Expand All @@ -177,7 +177,7 @@ func (p *Processor) processTransaction(ctx context.Context, event *modelpb.APMEv
// if it was sampled.
report := traceSampled
if report {
p.eventMetrics.sampled.Add(ctx, 1)
p.eventMetrics.sampled.Add(context.Background(), 1)
}
return report, false, nil
case eventstorage.ErrNotFound:
Expand Down Expand Up @@ -229,7 +229,7 @@ sampling policies without service name specified.
return false, true, p.eventStore.WriteTraceEvent(event.Trace.Id, event.Transaction.Id, event)
}

func (p *Processor) processSpan(ctx context.Context, event *modelpb.APMEvent) (report, stored bool, _ error) {
func (p *Processor) processSpan(event *modelpb.APMEvent) (report, stored bool, _ error) {
traceSampled, err := p.eventStore.IsTraceSampled(event.Trace.Id)
if err != nil {
if err == eventstorage.ErrNotFound {
Expand All @@ -240,7 +240,7 @@ func (p *Processor) processSpan(ctx context.Context, event *modelpb.APMEvent) (r
}
// Tail-sampling decision has been made, report or drop the event.
if traceSampled {
p.eventMetrics.sampled.Add(ctx, 1)
p.eventMetrics.sampled.Add(context.Background(), 1)
}
return traceSampled, false, nil
}
Expand Down

0 comments on commit b62fd62

Please sign in to comment.