Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.x] Translate otel metrics to libbeat monitoring (backport #15094) #15440

Merged
merged 4 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 35 additions & 32 deletions internal/agentcfg/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ import (

"github.com/pkg/errors"
"go.elastic.co/apm/v2"
"go.opentelemetry.io/otel/metric"

"github.com/elastic/apm-server/internal/elasticsearch"
"github.com/elastic/apm-server/internal/logs"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/go-elasticsearch/v8/esapi"
)

Expand Down Expand Up @@ -71,22 +71,34 @@ type ElasticsearchFetcher struct {

logger, rateLimitedLogger *logp.Logger

tracer *apm.Tracer
metrics fetcherMetrics
}
tracer *apm.Tracer

type fetcherMetrics struct {
fetchES, fetchFallback, fetchFallbackUnavailable, fetchInvalid,
cacheRefreshSuccesses, cacheRefreshFailures,
cacheEntriesCount atomic.Int64
esCacheEntriesCount metric.Int64Gauge
esFetchCount metric.Int64Counter
esFetchFallbackCount metric.Int64Counter
esFetchUnavailableCount metric.Int64Counter
esFetchInvalidCount metric.Int64Counter
esCacheRefreshSuccesses metric.Int64Counter
esCacheRefreshFailures metric.Int64Counter
}

func NewElasticsearchFetcher(
client *elasticsearch.Client,
cacheDuration time.Duration,
fetcher Fetcher,
tracer *apm.Tracer,
mp metric.MeterProvider,
) *ElasticsearchFetcher {
meter := mp.Meter("github.com/elastic/apm-server/internal/agentcfg")

esCacheEntriesCount, _ := meter.Int64Gauge("apm-server.agentcfg.elasticsearch.cache.entries.count")
esFetchCount, _ := meter.Int64Counter("apm-server.agentcfg.elasticsearch.fetch.es")
esFetchFallbackCount, _ := meter.Int64Counter("apm-server.agentcfg.elasticsearch.fetch.fallback")
esFetchUnavailableCount, _ := meter.Int64Counter("apm-server.agentcfg.elasticsearch.fetch.unavailable")
esFetchInvalidCount, _ := meter.Int64Counter("apm-server.agentcfg.elasticsearch.fetch.invalid")
esCacheRefreshSuccesses, _ := meter.Int64Counter("apm-server.agentcfg.elasticsearch.cache.refresh.successes")
esCacheRefreshFailures, _ := meter.Int64Counter("apm-server.agentcfg.elasticsearch.cache.refresh.failures")

logger := logp.NewLogger("agentcfg")
return &ElasticsearchFetcher{
client: client,
Expand All @@ -96,6 +108,14 @@ func NewElasticsearchFetcher(
logger: logger,
rateLimitedLogger: logger.WithOptions(logs.WithRateLimit(loggerRateLimit)),
tracer: tracer,

esCacheEntriesCount: esCacheEntriesCount,
esFetchCount: esFetchCount,
esFetchFallbackCount: esFetchFallbackCount,
esFetchUnavailableCount: esFetchUnavailableCount,
esFetchInvalidCount: esFetchInvalidCount,
esCacheRefreshSuccesses: esCacheRefreshSuccesses,
esCacheRefreshFailures: esCacheRefreshFailures,
}
}

Expand All @@ -105,22 +125,22 @@ func (f *ElasticsearchFetcher) Fetch(ctx context.Context, query Query) (Result,
// Happy path: serve fetch requests using an initialized cache.
f.mu.RLock()
defer f.mu.RUnlock()
f.metrics.fetchES.Add(1)
f.esFetchCount.Add(ctx, 1)
return matchAgentConfig(query, f.cache), nil
}

if f.fallbackFetcher != nil {
f.metrics.fetchFallback.Add(1)
f.esFetchFallbackCount.Add(ctx, 1)
return f.fallbackFetcher.Fetch(ctx, query)
}

if f.invalidESCfg.Load() {
f.metrics.fetchInvalid.Add(1)
f.esFetchInvalidCount.Add(ctx, 1)
f.rateLimitedLogger.Errorf("rejecting fetch request: no valid elasticsearch config")
return Result{}, errors.New(ErrNoValidElasticsearchConfig)
}

f.metrics.fetchFallbackUnavailable.Add(1)
f.esFetchUnavailableCount.Add(ctx, 1)
f.rateLimitedLogger.Warnf("rejecting fetch request: infrastructure is not ready")
return Result{}, errors.New(ErrInfrastructureNotReady)
}
Expand Down Expand Up @@ -213,9 +233,9 @@ func (f *ElasticsearchFetcher) refreshCache(ctx context.Context) (err error) {

defer func() {
if err != nil {
f.metrics.cacheRefreshFailures.Add(1)
f.esCacheRefreshFailures.Add(ctx, 1)
} else {
f.metrics.cacheRefreshSuccesses.Add(1)
f.esCacheRefreshSuccesses.Add(ctx, 1)
}
}()

Expand Down Expand Up @@ -247,7 +267,7 @@ func (f *ElasticsearchFetcher) refreshCache(ctx context.Context) (err error) {
f.cache = buffer
f.mu.Unlock()
f.cacheInitialized.Store(true)
f.metrics.cacheEntriesCount.Store(int64(len(f.cache)))
f.esCacheEntriesCount.Record(ctx, int64(len(f.cache)))
f.last = time.Now()
return nil
}
Expand Down Expand Up @@ -304,20 +324,3 @@ func (f *ElasticsearchFetcher) singlePageRefresh(ctx context.Context, scrollID s
}
return result, json.NewDecoder(resp.Body).Decode(&result)
}

// CollectMonitoring may be called to collect monitoring metrics from the
// fetcher. It is intended to be used with libbeat/monitoring.NewFunc.
//
// The metrics should be added to the "apm-server.agentcfg.elasticsearch" registry.
func (f *ElasticsearchFetcher) CollectMonitoring(_ monitoring.Mode, V monitoring.Visitor) {
V.OnRegistryStart()
defer V.OnRegistryFinished()

monitoring.ReportInt(V, "cache.entries.count", f.metrics.cacheEntriesCount.Load())
monitoring.ReportInt(V, "fetch.es", f.metrics.fetchES.Load())
monitoring.ReportInt(V, "fetch.fallback", f.metrics.fetchFallback.Load())
monitoring.ReportInt(V, "fetch.unavailable", f.metrics.fetchFallbackUnavailable.Load())
monitoring.ReportInt(V, "fetch.invalid", f.metrics.fetchInvalid.Load())
monitoring.ReportInt(V, "cache.refresh.successes", f.metrics.cacheRefreshSuccesses.Load())
monitoring.ReportInt(V, "cache.refresh.failures", f.metrics.cacheRefreshFailures.Load())
}
6 changes: 5 additions & 1 deletion internal/agentcfg/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/stretchr/testify/require"
"go.elastic.co/apm/v2"
"go.elastic.co/apm/v2/apmtest"
"go.opentelemetry.io/otel/metric/noop"

"github.com/elastic/apm-server/internal/elasticsearch"
)
Expand Down Expand Up @@ -104,7 +105,7 @@ func newElasticsearchFetcher(
w.WriteHeader(200)
w.Write(b)
i += searchSize
}), time.Second, nil, rt)
}), time.Second, nil, rt, noop.NewMeterProvider())
fetcher.searchSize = searchSize
return fetcher
}
Expand Down Expand Up @@ -193,6 +194,7 @@ func TestFetchUseFallback(t *testing.T) {
time.Second,
fallbackFetcher,
apmtest.NewRecordingTracer().Tracer,
noop.NewMeterProvider(),
)

fetcher.refreshCache(context.Background())
Expand All @@ -208,6 +210,7 @@ func TestFetchNoFallbackInvalidESCfg(t *testing.T) {
time.Second,
nil,
apmtest.NewRecordingTracer().Tracer,
noop.NewMeterProvider(),
)

err := fetcher.refreshCache(context.Background())
Expand All @@ -224,6 +227,7 @@ func TestFetchNoFallback(t *testing.T) {
time.Second,
nil,
apmtest.NewRecordingTracer().Tracer,
noop.NewMeterProvider(),
)

err := fetcher.refreshCache(context.Background())
Expand Down
54 changes: 54 additions & 0 deletions internal/beatcmd/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,22 @@ func (b *Beat) registerStatsMetrics() {
}
}
})
monitoring.NewFunc(monitoring.Default, "apm-server", func(_ monitoring.Mode, v monitoring.Visitor) {
var rm metricdata.ResourceMetrics
if err := b.metricReader.Collect(context.Background(), &rm); err != nil {
return
}
v.OnRegistryStart()
defer v.OnRegistryFinished()
for _, sm := range rm.ScopeMetrics {
switch {
case strings.HasPrefix(sm.Scope.Name, "github.com/elastic/apm-server"):
// All simple scalar metrics that begin with the name "apm-server."
// in github.com/elastic/apm-server/... scopes are mapped directly.
addAPMServerMetrics(v, sm)
}
}
})
}

// getScalarInt64 returns a single-value, dimensionless
Expand All @@ -560,6 +576,44 @@ func getScalarInt64(data metricdata.Aggregation) (int64, bool) {
return 0, false
}

func addAPMServerMetrics(v monitoring.Visitor, sm metricdata.ScopeMetrics) {
beatsMetrics := make(map[string]any)
for _, m := range sm.Metrics {
if suffix, ok := strings.CutPrefix(m.Name, "apm-server."); ok {
if value, ok := getScalarInt64(m.Data); ok {
current := beatsMetrics
suffixSlice := strings.Split(suffix, ".")
for i := 0; i < len(suffixSlice)-1; i++ {
k := suffixSlice[i]
if _, ok := current[k]; !ok {
current[k] = make(map[string]any)
}
if currentmap, ok := current[k].(map[string]any); ok {
current = currentmap
}
}
current[suffixSlice[len(suffixSlice)-1]] = value
}
}
}

reportOnKey(v, beatsMetrics)
}

func reportOnKey(v monitoring.Visitor, m map[string]any) {
for key, value := range m {
if valueMap, ok := value.(map[string]any); ok {
v.OnRegistryStart()
v.OnKey(key)
reportOnKey(v, valueMap)
v.OnRegistryFinished()
}
if valueMetric, ok := value.(int64); ok {
monitoring.ReportInt(v, key, valueMetric)
}
}
}

// Adapt go-docappender's OTel metrics to beats stack monitoring metrics,
// with a mixture of libbeat-specific and apm-server specific metric names.
func addDocappenderLibbeatOutputMetrics(ctx context.Context, v monitoring.Visitor, sm metricdata.ScopeMetrics) {
Expand Down
8 changes: 5 additions & 3 deletions internal/beatcmd/reloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,11 @@ func (r *Reloader) reload(inputConfig, outputConfig, apmTracingConfig *config.C)
// allow the runner to perform initialisations that must run
// synchronously.
newRunner, err := r.newRunner(RunnerParams{
Config: mergedConfig,
Info: r.info,
Logger: r.logger,
Config: mergedConfig,
Info: r.info,
Logger: r.logger,
MeterProvider: r.meterProvider,
MetricsGatherer: r.metricGatherer,
})
if err != nil {
return err
Expand Down
6 changes: 0 additions & 6 deletions internal/beater/api/config/agent/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import (

"github.com/pkg/errors"

"github.com/elastic/elastic-agent-libs/monitoring"

"github.com/elastic/apm-server/internal/agentcfg"
"github.com/elastic/apm-server/internal/beater/auth"
"github.com/elastic/apm-server/internal/beater/headers"
Expand All @@ -43,10 +41,6 @@ const (
)

var (
// MonitoringMap holds a mapping for request.IDs to monitoring counters
MonitoringMap = request.DefaultMonitoringMapForRegistry(registry)
registry = monitoring.Default.NewRegistry("apm-server.acm")

errCacheControl = fmt.Sprintf("max-age=%v, must-revalidate", errMaxAgeDuration.Seconds())
)

Expand Down
25 changes: 11 additions & 14 deletions internal/beater/api/intake/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
package intake

import (
"context"
"errors"
"fmt"
"net/http"
"strings"

"github.com/elastic/elastic-agent-libs/monitoring"
"go.opentelemetry.io/otel/metric"

"github.com/elastic/apm-data/input/elasticapm"
"github.com/elastic/apm-data/model/modelpb"
Expand All @@ -39,15 +40,6 @@ const (
)

var (
// MonitoringMap holds a mapping for request.IDs to monitoring counters
MonitoringMap = request.DefaultMonitoringMapForRegistry(registry)
registry = monitoring.Default.NewRegistry("apm-server.server")

streamRegistry = monitoring.Default.NewRegistry("apm-server.processor.stream")
eventsAccepted = monitoring.NewInt(streamRegistry, "accepted")
eventsInvalid = monitoring.NewInt(streamRegistry, "errors.invalid")
eventsTooLarge = monitoring.NewInt(streamRegistry, "errors.toolarge")

errMethodNotAllowed = errors.New("only POST requests are supported")
errServerShuttingDown = errors.New("server is shutting down")
errInvalidContentType = errors.New("invalid content type")
Expand All @@ -59,7 +51,12 @@ var (
type RequestMetadataFunc func(*request.Context) *modelpb.APMEvent

// Handler returns a request.Handler for managing intake requests for backend and rum events.
func Handler(handler elasticapm.StreamHandler, requestMetadataFunc RequestMetadataFunc, batchProcessor modelpb.BatchProcessor) request.Handler {
func Handler(mp metric.MeterProvider, handler elasticapm.StreamHandler, requestMetadataFunc RequestMetadataFunc, batchProcessor modelpb.BatchProcessor) request.Handler {
meter := mp.Meter("github.com/elastic/apm-server/internal/beater/api/intake")
eventsAccepted, _ := meter.Int64Counter("apm-server.processor.stream.accepted")
eventsInvalid, _ := meter.Int64Counter("apm-server.processor.stream.errors.invalid")
eventsTooLarge, _ := meter.Int64Counter("apm-server.processor.stream.errors.toolarge")

return func(c *request.Context) {
if err := validateRequest(c); err != nil {
writeError(c, err)
Expand All @@ -82,9 +79,9 @@ func Handler(handler elasticapm.StreamHandler, requestMetadataFunc RequestMetada
batchProcessor,
&result,
)
eventsAccepted.Add(int64(result.Accepted))
eventsInvalid.Add(int64(result.Invalid))
eventsTooLarge.Add(int64(result.TooLarge))
eventsAccepted.Add(context.Background(), int64(result.Accepted))
eventsInvalid.Add(context.Background(), int64(result.Invalid))
eventsTooLarge.Add(context.Background(), int64(result.TooLarge))
writeStreamResult(c, result, err)
}
}
Expand Down
Loading
Loading