Skip to content

Commit fb729b8

Browse files
[8.x] Translate otel metrics to libbeat monitoring (backport #15094) (#15440)
* Translate otel metrics to libbeat monitoring (#15094) * Translate otel metrics to libbeat monitoring * demo: send metrics directly and add another reader * Revert "demo: send metrics directly and add another reader" This reverts commit 166a717. * lint: fix linter issues * lint: fix linter issues * feat: refactor code to propagate meterprovider and fix tests * lint: fix linter issues * refactor: remove unused files * refactor: remove more global meters * refactor: cleanup more unused code * refactor: remove more unused code * test: account for agentcfg metric in test * test: account for agentcfg metric in test * fix: pass meter provider in main func * Fix LSM metrics tests * test: fix remaining test * lint: fix linter issues * fix: update docappender metrics name * test: update systemtest metric assertions * fix: update grpc interceptor meter path metrics were not being exported because the meter was not recognized as apm-server meter * fix: add back output.type=elasticsearch * test: upate remaining systemtest * test: remove debug print * fix: use correct outputRegistry variable fix panic * fix: remove panic on err * fix: propagate meter provider to grpc services * lint: add meterprovider field docs * lint: fix comment typo * feat: pass apmotel gatherer too * refactor: use switch pattern consistently when mapping metrics * Update beater.go * Update beat.go * Update beater.go * fix: solve compile errors and systemtest fix * refactor: reduce diff noise * lint: fix linter issues * lint: fix linter issues * Update x-pack/apm-server/main.go Co-authored-by: Andrew Wilkins <axwalk@gmail.com> * test: use legacy metrics for validating grpc tests * fix: unregister callback if available forgot to pus this --------- Co-authored-by: Andrew Wilkins <axw@elastic.co> Co-authored-by: Andrew Wilkins <axwalk@gmail.com> (cherry picked from commit 378b60c) # Conflicts: # internal/beatcmd/beat.go # internal/beater/beater.go # internal/beater/server.go # internal/beater/server_test.go * fix: resolve conflicts --------- Co-authored-by: kruskall <99559985+kruskall@users.noreply.github.com> Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
1 parent 682a499 commit fb729b8

48 files changed

Lines changed: 909 additions & 1306 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

internal/agentcfg/elasticsearch.go

Lines changed: 35 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,11 @@ import (
2929

3030
"github.com/pkg/errors"
3131
"go.elastic.co/apm/v2"
32+
"go.opentelemetry.io/otel/metric"
3233

3334
"github.com/elastic/apm-server/internal/elasticsearch"
3435
"github.com/elastic/apm-server/internal/logs"
3536
"github.com/elastic/elastic-agent-libs/logp"
36-
"github.com/elastic/elastic-agent-libs/monitoring"
3737
"github.com/elastic/go-elasticsearch/v8/esapi"
3838
)
3939

@@ -71,22 +71,34 @@ type ElasticsearchFetcher struct {
7171

7272
logger, rateLimitedLogger *logp.Logger
7373

74-
tracer *apm.Tracer
75-
metrics fetcherMetrics
76-
}
74+
tracer *apm.Tracer
7775

78-
type fetcherMetrics struct {
79-
fetchES, fetchFallback, fetchFallbackUnavailable, fetchInvalid,
80-
cacheRefreshSuccesses, cacheRefreshFailures,
81-
cacheEntriesCount atomic.Int64
76+
esCacheEntriesCount metric.Int64Gauge
77+
esFetchCount metric.Int64Counter
78+
esFetchFallbackCount metric.Int64Counter
79+
esFetchUnavailableCount metric.Int64Counter
80+
esFetchInvalidCount metric.Int64Counter
81+
esCacheRefreshSuccesses metric.Int64Counter
82+
esCacheRefreshFailures metric.Int64Counter
8283
}
8384

8485
func NewElasticsearchFetcher(
8586
client *elasticsearch.Client,
8687
cacheDuration time.Duration,
8788
fetcher Fetcher,
8889
tracer *apm.Tracer,
90+
mp metric.MeterProvider,
8991
) *ElasticsearchFetcher {
92+
meter := mp.Meter("github.com/elastic/apm-server/internal/agentcfg")
93+
94+
esCacheEntriesCount, _ := meter.Int64Gauge("apm-server.agentcfg.elasticsearch.cache.entries.count")
95+
esFetchCount, _ := meter.Int64Counter("apm-server.agentcfg.elasticsearch.fetch.es")
96+
esFetchFallbackCount, _ := meter.Int64Counter("apm-server.agentcfg.elasticsearch.fetch.fallback")
97+
esFetchUnavailableCount, _ := meter.Int64Counter("apm-server.agentcfg.elasticsearch.fetch.unavailable")
98+
esFetchInvalidCount, _ := meter.Int64Counter("apm-server.agentcfg.elasticsearch.fetch.invalid")
99+
esCacheRefreshSuccesses, _ := meter.Int64Counter("apm-server.agentcfg.elasticsearch.cache.refresh.successes")
100+
esCacheRefreshFailures, _ := meter.Int64Counter("apm-server.agentcfg.elasticsearch.cache.refresh.failures")
101+
90102
logger := logp.NewLogger("agentcfg")
91103
return &ElasticsearchFetcher{
92104
client: client,
@@ -96,6 +108,14 @@ func NewElasticsearchFetcher(
96108
logger: logger,
97109
rateLimitedLogger: logger.WithOptions(logs.WithRateLimit(loggerRateLimit)),
98110
tracer: tracer,
111+
112+
esCacheEntriesCount: esCacheEntriesCount,
113+
esFetchCount: esFetchCount,
114+
esFetchFallbackCount: esFetchFallbackCount,
115+
esFetchUnavailableCount: esFetchUnavailableCount,
116+
esFetchInvalidCount: esFetchInvalidCount,
117+
esCacheRefreshSuccesses: esCacheRefreshSuccesses,
118+
esCacheRefreshFailures: esCacheRefreshFailures,
99119
}
100120
}
101121

@@ -105,22 +125,22 @@ func (f *ElasticsearchFetcher) Fetch(ctx context.Context, query Query) (Result,
105125
// Happy path: serve fetch requests using an initialized cache.
106126
f.mu.RLock()
107127
defer f.mu.RUnlock()
108-
f.metrics.fetchES.Add(1)
128+
f.esFetchCount.Add(ctx, 1)
109129
return matchAgentConfig(query, f.cache), nil
110130
}
111131

112132
if f.fallbackFetcher != nil {
113-
f.metrics.fetchFallback.Add(1)
133+
f.esFetchFallbackCount.Add(ctx, 1)
114134
return f.fallbackFetcher.Fetch(ctx, query)
115135
}
116136

117137
if f.invalidESCfg.Load() {
118-
f.metrics.fetchInvalid.Add(1)
138+
f.esFetchInvalidCount.Add(ctx, 1)
119139
f.rateLimitedLogger.Errorf("rejecting fetch request: no valid elasticsearch config")
120140
return Result{}, errors.New(ErrNoValidElasticsearchConfig)
121141
}
122142

123-
f.metrics.fetchFallbackUnavailable.Add(1)
143+
f.esFetchUnavailableCount.Add(ctx, 1)
124144
f.rateLimitedLogger.Warnf("rejecting fetch request: infrastructure is not ready")
125145
return Result{}, errors.New(ErrInfrastructureNotReady)
126146
}
@@ -213,9 +233,9 @@ func (f *ElasticsearchFetcher) refreshCache(ctx context.Context) (err error) {
213233

214234
defer func() {
215235
if err != nil {
216-
f.metrics.cacheRefreshFailures.Add(1)
236+
f.esCacheRefreshFailures.Add(ctx, 1)
217237
} else {
218-
f.metrics.cacheRefreshSuccesses.Add(1)
238+
f.esCacheRefreshSuccesses.Add(ctx, 1)
219239
}
220240
}()
221241

@@ -247,7 +267,7 @@ func (f *ElasticsearchFetcher) refreshCache(ctx context.Context) (err error) {
247267
f.cache = buffer
248268
f.mu.Unlock()
249269
f.cacheInitialized.Store(true)
250-
f.metrics.cacheEntriesCount.Store(int64(len(f.cache)))
270+
f.esCacheEntriesCount.Record(ctx, int64(len(f.cache)))
251271
f.last = time.Now()
252272
return nil
253273
}
@@ -304,20 +324,3 @@ func (f *ElasticsearchFetcher) singlePageRefresh(ctx context.Context, scrollID s
304324
}
305325
return result, json.NewDecoder(resp.Body).Decode(&result)
306326
}
307-
308-
// CollectMonitoring may be called to collect monitoring metrics from the
309-
// fetcher. It is intended to be used with libbeat/monitoring.NewFunc.
310-
//
311-
// The metrics should be added to the "apm-server.agentcfg.elasticsearch" registry.
312-
func (f *ElasticsearchFetcher) CollectMonitoring(_ monitoring.Mode, V monitoring.Visitor) {
313-
V.OnRegistryStart()
314-
defer V.OnRegistryFinished()
315-
316-
monitoring.ReportInt(V, "cache.entries.count", f.metrics.cacheEntriesCount.Load())
317-
monitoring.ReportInt(V, "fetch.es", f.metrics.fetchES.Load())
318-
monitoring.ReportInt(V, "fetch.fallback", f.metrics.fetchFallback.Load())
319-
monitoring.ReportInt(V, "fetch.unavailable", f.metrics.fetchFallbackUnavailable.Load())
320-
monitoring.ReportInt(V, "fetch.invalid", f.metrics.fetchInvalid.Load())
321-
monitoring.ReportInt(V, "cache.refresh.successes", f.metrics.cacheRefreshSuccesses.Load())
322-
monitoring.ReportInt(V, "cache.refresh.failures", f.metrics.cacheRefreshFailures.Load())
323-
}

internal/agentcfg/elasticsearch_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/stretchr/testify/require"
3131
"go.elastic.co/apm/v2"
3232
"go.elastic.co/apm/v2/apmtest"
33+
"go.opentelemetry.io/otel/metric/noop"
3334

3435
"github.com/elastic/apm-server/internal/elasticsearch"
3536
)
@@ -104,7 +105,7 @@ func newElasticsearchFetcher(
104105
w.WriteHeader(200)
105106
w.Write(b)
106107
i += searchSize
107-
}), time.Second, nil, rt)
108+
}), time.Second, nil, rt, noop.NewMeterProvider())
108109
fetcher.searchSize = searchSize
109110
return fetcher
110111
}
@@ -193,6 +194,7 @@ func TestFetchUseFallback(t *testing.T) {
193194
time.Second,
194195
fallbackFetcher,
195196
apmtest.NewRecordingTracer().Tracer,
197+
noop.NewMeterProvider(),
196198
)
197199

198200
fetcher.refreshCache(context.Background())
@@ -208,6 +210,7 @@ func TestFetchNoFallbackInvalidESCfg(t *testing.T) {
208210
time.Second,
209211
nil,
210212
apmtest.NewRecordingTracer().Tracer,
213+
noop.NewMeterProvider(),
211214
)
212215

213216
err := fetcher.refreshCache(context.Background())
@@ -224,6 +227,7 @@ func TestFetchNoFallback(t *testing.T) {
224227
time.Second,
225228
nil,
226229
apmtest.NewRecordingTracer().Tracer,
230+
noop.NewMeterProvider(),
227231
)
228232

229233
err := fetcher.refreshCache(context.Background())

internal/beatcmd/beat.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -539,6 +539,22 @@ func (b *Beat) registerStatsMetrics() {
539539
}
540540
}
541541
})
542+
monitoring.NewFunc(monitoring.Default, "apm-server", func(_ monitoring.Mode, v monitoring.Visitor) {
543+
var rm metricdata.ResourceMetrics
544+
if err := b.metricReader.Collect(context.Background(), &rm); err != nil {
545+
return
546+
}
547+
v.OnRegistryStart()
548+
defer v.OnRegistryFinished()
549+
for _, sm := range rm.ScopeMetrics {
550+
switch {
551+
case strings.HasPrefix(sm.Scope.Name, "github.com/elastic/apm-server"):
552+
// All simple scalar metrics that begin with the name "apm-server."
553+
// in github.com/elastic/apm-server/... scopes are mapped directly.
554+
addAPMServerMetrics(v, sm)
555+
}
556+
}
557+
})
542558
}
543559

544560
// getScalarInt64 returns a single-value, dimensionless
@@ -560,6 +576,44 @@ func getScalarInt64(data metricdata.Aggregation) (int64, bool) {
560576
return 0, false
561577
}
562578

579+
func addAPMServerMetrics(v monitoring.Visitor, sm metricdata.ScopeMetrics) {
580+
beatsMetrics := make(map[string]any)
581+
for _, m := range sm.Metrics {
582+
if suffix, ok := strings.CutPrefix(m.Name, "apm-server."); ok {
583+
if value, ok := getScalarInt64(m.Data); ok {
584+
current := beatsMetrics
585+
suffixSlice := strings.Split(suffix, ".")
586+
for i := 0; i < len(suffixSlice)-1; i++ {
587+
k := suffixSlice[i]
588+
if _, ok := current[k]; !ok {
589+
current[k] = make(map[string]any)
590+
}
591+
if currentmap, ok := current[k].(map[string]any); ok {
592+
current = currentmap
593+
}
594+
}
595+
current[suffixSlice[len(suffixSlice)-1]] = value
596+
}
597+
}
598+
}
599+
600+
reportOnKey(v, beatsMetrics)
601+
}
602+
603+
func reportOnKey(v monitoring.Visitor, m map[string]any) {
604+
for key, value := range m {
605+
if valueMap, ok := value.(map[string]any); ok {
606+
v.OnRegistryStart()
607+
v.OnKey(key)
608+
reportOnKey(v, valueMap)
609+
v.OnRegistryFinished()
610+
}
611+
if valueMetric, ok := value.(int64); ok {
612+
monitoring.ReportInt(v, key, valueMetric)
613+
}
614+
}
615+
}
616+
563617
// Adapt go-docappender's OTel metrics to beats stack monitoring metrics,
564618
// with a mixture of libbeat-specific and apm-server specific metric names.
565619
func addDocappenderLibbeatOutputMetrics(ctx context.Context, v monitoring.Visitor, sm metricdata.ScopeMetrics) {

internal/beatcmd/reloader.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -254,9 +254,11 @@ func (r *Reloader) reload(inputConfig, outputConfig, apmTracingConfig *config.C)
254254
// allow the runner to perform initialisations that must run
255255
// synchronously.
256256
newRunner, err := r.newRunner(RunnerParams{
257-
Config: mergedConfig,
258-
Info: r.info,
259-
Logger: r.logger,
257+
Config: mergedConfig,
258+
Info: r.info,
259+
Logger: r.logger,
260+
MeterProvider: r.meterProvider,
261+
MetricsGatherer: r.metricGatherer,
260262
})
261263
if err != nil {
262264
return err

internal/beater/api/config/agent/handler.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ import (
2626

2727
"github.com/pkg/errors"
2828

29-
"github.com/elastic/elastic-agent-libs/monitoring"
30-
3129
"github.com/elastic/apm-server/internal/agentcfg"
3230
"github.com/elastic/apm-server/internal/beater/auth"
3331
"github.com/elastic/apm-server/internal/beater/headers"
@@ -43,10 +41,6 @@ const (
4341
)
4442

4543
var (
46-
// MonitoringMap holds a mapping for request.IDs to monitoring counters
47-
MonitoringMap = request.DefaultMonitoringMapForRegistry(registry)
48-
registry = monitoring.Default.NewRegistry("apm-server.acm")
49-
5044
errCacheControl = fmt.Sprintf("max-age=%v, must-revalidate", errMaxAgeDuration.Seconds())
5145
)
5246

internal/beater/api/intake/handler.go

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,13 @@
1818
package intake
1919

2020
import (
21+
"context"
2122
"errors"
2223
"fmt"
2324
"net/http"
2425
"strings"
2526

26-
"github.com/elastic/elastic-agent-libs/monitoring"
27+
"go.opentelemetry.io/otel/metric"
2728

2829
"github.com/elastic/apm-data/input/elasticapm"
2930
"github.com/elastic/apm-data/model/modelpb"
@@ -39,15 +40,6 @@ const (
3940
)
4041

4142
var (
42-
// MonitoringMap holds a mapping for request.IDs to monitoring counters
43-
MonitoringMap = request.DefaultMonitoringMapForRegistry(registry)
44-
registry = monitoring.Default.NewRegistry("apm-server.server")
45-
46-
streamRegistry = monitoring.Default.NewRegistry("apm-server.processor.stream")
47-
eventsAccepted = monitoring.NewInt(streamRegistry, "accepted")
48-
eventsInvalid = monitoring.NewInt(streamRegistry, "errors.invalid")
49-
eventsTooLarge = monitoring.NewInt(streamRegistry, "errors.toolarge")
50-
5143
errMethodNotAllowed = errors.New("only POST requests are supported")
5244
errServerShuttingDown = errors.New("server is shutting down")
5345
errInvalidContentType = errors.New("invalid content type")
@@ -59,7 +51,12 @@ var (
5951
type RequestMetadataFunc func(*request.Context) *modelpb.APMEvent
6052

6153
// Handler returns a request.Handler for managing intake requests for backend and rum events.
62-
func Handler(handler elasticapm.StreamHandler, requestMetadataFunc RequestMetadataFunc, batchProcessor modelpb.BatchProcessor) request.Handler {
54+
func Handler(mp metric.MeterProvider, handler elasticapm.StreamHandler, requestMetadataFunc RequestMetadataFunc, batchProcessor modelpb.BatchProcessor) request.Handler {
55+
meter := mp.Meter("github.com/elastic/apm-server/internal/beater/api/intake")
56+
eventsAccepted, _ := meter.Int64Counter("apm-server.processor.stream.accepted")
57+
eventsInvalid, _ := meter.Int64Counter("apm-server.processor.stream.errors.invalid")
58+
eventsTooLarge, _ := meter.Int64Counter("apm-server.processor.stream.errors.toolarge")
59+
6360
return func(c *request.Context) {
6461
if err := validateRequest(c); err != nil {
6562
writeError(c, err)
@@ -82,9 +79,9 @@ func Handler(handler elasticapm.StreamHandler, requestMetadataFunc RequestMetada
8279
batchProcessor,
8380
&result,
8481
)
85-
eventsAccepted.Add(int64(result.Accepted))
86-
eventsInvalid.Add(int64(result.Invalid))
87-
eventsTooLarge.Add(int64(result.TooLarge))
82+
eventsAccepted.Add(context.Background(), int64(result.Accepted))
83+
eventsInvalid.Add(context.Background(), int64(result.Invalid))
84+
eventsTooLarge.Add(context.Background(), int64(result.TooLarge))
8885
writeStreamResult(c, result, err)
8986
}
9087
}

0 commit comments

Comments
 (0)