Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Translate otel metrics to libbeat monitoring #15094

Merged
merged 48 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
7bcfa46
Translate otel metrics to libbeat monitoring
axw Dec 4, 2024
0491b4c
Merge remote-tracking branch 'upstream/main' into otel-adapter
kruskall Dec 31, 2024
166a717
demo: send metrics directly and add another reader
kruskall Jan 1, 2025
11b11d8
Revert "demo: send metrics directly and add another reader"
kruskall Jan 1, 2025
128b5b9
lint: fix linter issues
kruskall Jan 1, 2025
0adb6ab
Merge branch 'main' into otel-adapter
kruskall Jan 15, 2025
642a89b
lint: fix linter issues
kruskall Jan 15, 2025
15d6652
feat: refactor code to propagate meterprovider and fix tests
kruskall Jan 21, 2025
bfedda8
Merge remote-tracking branch 'upstream/main' into otel-adapter
kruskall Jan 21, 2025
cc914d0
lint: fix linter issues
kruskall Jan 21, 2025
4e9110d
refactor: remove unused files
kruskall Jan 21, 2025
63a49cc
refactor: remove more global meters
kruskall Jan 21, 2025
64fe558
refactor: cleanup more unused code
kruskall Jan 21, 2025
df2e35c
Merge branch 'main' into otel-adapter
kruskall Jan 21, 2025
3931dab
refactor: remove more unused code
kruskall Jan 21, 2025
750668f
test: account for agentcfg metric in test
kruskall Jan 21, 2025
fe73501
test: account for agentcfg metric in test
kruskall Jan 21, 2025
ee6fc70
fix: pass meter provider in main func
kruskall Jan 21, 2025
8d8b066
Fix LSM metrics tests
axw Jan 22, 2025
9821bcc
test: fix remaining test
kruskall Jan 22, 2025
50bcc39
lint: fix linter issues
kruskall Jan 22, 2025
b4b9631
fix: update docappender metrics name
kruskall Jan 22, 2025
b275098
test: update systemtest metric assertions
kruskall Jan 22, 2025
8155566
fix: update grpc interceptor meter path
kruskall Jan 22, 2025
4ae20a0
fix: add back output.type=elasticsearch
kruskall Jan 22, 2025
3b4fc56
test: upate remaining systemtest
kruskall Jan 22, 2025
62e0c25
test: remove debug print
kruskall Jan 22, 2025
ee22add
Merge branch 'main' into otel-adapter
kruskall Jan 22, 2025
928ccf8
fix: use correct outputRegistry variable
kruskall Jan 22, 2025
268e4d7
Merge branch 'main' into otel-adapter
kruskall Jan 23, 2025
46ed7f7
fix: remove panic on err
kruskall Jan 23, 2025
c459533
fix: propagate meter provider to grpc services
kruskall Jan 23, 2025
dd8ccf9
lint: add meterprovider field docs
kruskall Jan 23, 2025
bc9ec45
lint: fix comment typo
kruskall Jan 23, 2025
87d42ba
feat: pass apmotel gatherer too
kruskall Jan 23, 2025
f400792
refactor: use switch pattern consistently when mapping metrics
kruskall Jan 23, 2025
775b0ec
Update beater.go
kruskall Jan 24, 2025
243e90f
Update beat.go
kruskall Jan 24, 2025
1d9793f
Update beater.go
kruskall Jan 24, 2025
3575c50
Merge branch 'main' into otel-adapter
kruskall Jan 24, 2025
08501f2
fix: solve compile errors and systemtest fix
kruskall Jan 24, 2025
ea2260f
refactor: reduce diff noise
kruskall Jan 24, 2025
83bc51b
lint: fix linter issues
kruskall Jan 24, 2025
929f011
lint: fix linter issues
kruskall Jan 24, 2025
1f2322b
Update x-pack/apm-server/main.go
kruskall Jan 27, 2025
06080ca
test: use legacy metrics for validating grpc tests
kruskall Jan 27, 2025
45f84a7
fix: unregister callback if available
kruskall Jan 27, 2025
c95e71d
Merge branch 'main' into otel-adapter
kruskall Jan 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 35 additions & 32 deletions internal/agentcfg/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ import (

"github.com/pkg/errors"
"go.elastic.co/apm/v2"
"go.opentelemetry.io/otel/metric"

"github.com/elastic/apm-server/internal/elasticsearch"
"github.com/elastic/apm-server/internal/logs"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/go-elasticsearch/v8/esapi"
)

Expand Down Expand Up @@ -71,22 +71,34 @@ type ElasticsearchFetcher struct {

logger, rateLimitedLogger *logp.Logger

tracer *apm.Tracer
metrics fetcherMetrics
}
tracer *apm.Tracer

type fetcherMetrics struct {
fetchES, fetchFallback, fetchFallbackUnavailable, fetchInvalid,
cacheRefreshSuccesses, cacheRefreshFailures,
cacheEntriesCount atomic.Int64
esCacheEntriesCount metric.Int64Gauge
esFetchCount metric.Int64Counter
esFetchFallbackCount metric.Int64Counter
esFetchUnavailableCount metric.Int64Counter
esFetchInvalidCount metric.Int64Counter
esCacheRefreshSuccesses metric.Int64Counter
esCacheRefreshFailures metric.Int64Counter
}

func NewElasticsearchFetcher(
client *elasticsearch.Client,
cacheDuration time.Duration,
fetcher Fetcher,
tracer *apm.Tracer,
mp metric.MeterProvider,
) *ElasticsearchFetcher {
meter := mp.Meter("github.com/elastic/apm-server/internal/agentcfg")

esCacheEntriesCount, _ := meter.Int64Gauge("apm-server.agentcfg.elasticsearch.cache.entries.count")
esFetchCount, _ := meter.Int64Counter("apm-server.agentcfg.elasticsearch.fetch.es")
esFetchFallbackCount, _ := meter.Int64Counter("apm-server.agentcfg.elasticsearch.fetch.fallback")
esFetchUnavailableCount, _ := meter.Int64Counter("apm-server.agentcfg.elasticsearch.fetch.unavailable")
esFetchInvalidCount, _ := meter.Int64Counter("apm-server.agentcfg.elasticsearch.fetch.invalid")
esCacheRefreshSuccesses, _ := meter.Int64Counter("apm-server.agentcfg.elasticsearch.cache.refresh.successes")
esCacheRefreshFailures, _ := meter.Int64Counter("apm-server.agentcfg.elasticsearch.cache.refresh.failures")

logger := logp.NewLogger("agentcfg")
return &ElasticsearchFetcher{
client: client,
Expand All @@ -96,6 +108,14 @@ func NewElasticsearchFetcher(
logger: logger,
rateLimitedLogger: logger.WithOptions(logs.WithRateLimit(loggerRateLimit)),
tracer: tracer,

esCacheEntriesCount: esCacheEntriesCount,
esFetchCount: esFetchCount,
esFetchFallbackCount: esFetchFallbackCount,
esFetchUnavailableCount: esFetchUnavailableCount,
esFetchInvalidCount: esFetchInvalidCount,
esCacheRefreshSuccesses: esCacheRefreshSuccesses,
esCacheRefreshFailures: esCacheRefreshFailures,
}
}

Expand All @@ -105,22 +125,22 @@ func (f *ElasticsearchFetcher) Fetch(ctx context.Context, query Query) (Result,
// Happy path: serve fetch requests using an initialized cache.
f.mu.RLock()
defer f.mu.RUnlock()
f.metrics.fetchES.Add(1)
f.esFetchCount.Add(ctx, 1)
return matchAgentConfig(query, f.cache), nil
}

if f.fallbackFetcher != nil {
f.metrics.fetchFallback.Add(1)
f.esFetchFallbackCount.Add(ctx, 1)
return f.fallbackFetcher.Fetch(ctx, query)
}

if f.invalidESCfg.Load() {
f.metrics.fetchInvalid.Add(1)
f.esFetchInvalidCount.Add(ctx, 1)
f.rateLimitedLogger.Errorf("rejecting fetch request: no valid elasticsearch config")
return Result{}, errors.New(ErrNoValidElasticsearchConfig)
}

f.metrics.fetchFallbackUnavailable.Add(1)
f.esFetchUnavailableCount.Add(ctx, 1)
f.rateLimitedLogger.Warnf("rejecting fetch request: infrastructure is not ready")
return Result{}, errors.New(ErrInfrastructureNotReady)
}
Expand Down Expand Up @@ -213,9 +233,9 @@ func (f *ElasticsearchFetcher) refreshCache(ctx context.Context) (err error) {

defer func() {
if err != nil {
f.metrics.cacheRefreshFailures.Add(1)
f.esCacheRefreshFailures.Add(ctx, 1)
} else {
f.metrics.cacheRefreshSuccesses.Add(1)
f.esCacheRefreshSuccesses.Add(ctx, 1)
}
}()

Expand Down Expand Up @@ -247,7 +267,7 @@ func (f *ElasticsearchFetcher) refreshCache(ctx context.Context) (err error) {
f.cache = buffer
f.mu.Unlock()
f.cacheInitialized.Store(true)
f.metrics.cacheEntriesCount.Store(int64(len(f.cache)))
f.esCacheEntriesCount.Record(ctx, int64(len(f.cache)))
f.last = time.Now()
return nil
}
Expand Down Expand Up @@ -304,20 +324,3 @@ func (f *ElasticsearchFetcher) singlePageRefresh(ctx context.Context, scrollID s
}
return result, json.NewDecoder(resp.Body).Decode(&result)
}

// CollectMonitoring may be called to collect monitoring metrics from the
// fetcher. It is intended to be used with libbeat/monitoring.NewFunc.
//
// The metrics should be added to the "apm-server.agentcfg.elasticsearch" registry.
func (f *ElasticsearchFetcher) CollectMonitoring(_ monitoring.Mode, V monitoring.Visitor) {
V.OnRegistryStart()
defer V.OnRegistryFinished()

monitoring.ReportInt(V, "cache.entries.count", f.metrics.cacheEntriesCount.Load())
monitoring.ReportInt(V, "fetch.es", f.metrics.fetchES.Load())
monitoring.ReportInt(V, "fetch.fallback", f.metrics.fetchFallback.Load())
monitoring.ReportInt(V, "fetch.unavailable", f.metrics.fetchFallbackUnavailable.Load())
monitoring.ReportInt(V, "fetch.invalid", f.metrics.fetchInvalid.Load())
monitoring.ReportInt(V, "cache.refresh.successes", f.metrics.cacheRefreshSuccesses.Load())
monitoring.ReportInt(V, "cache.refresh.failures", f.metrics.cacheRefreshFailures.Load())
}
6 changes: 5 additions & 1 deletion internal/agentcfg/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/stretchr/testify/require"
"go.elastic.co/apm/v2"
"go.elastic.co/apm/v2/apmtest"
"go.opentelemetry.io/otel/metric/noop"

"github.com/elastic/apm-server/internal/elasticsearch"
)
Expand Down Expand Up @@ -104,7 +105,7 @@ func newElasticsearchFetcher(
w.WriteHeader(200)
w.Write(b)
i += searchSize
}), time.Second, nil, rt)
}), time.Second, nil, rt, noop.NewMeterProvider())
fetcher.searchSize = searchSize
return fetcher
}
Expand Down Expand Up @@ -193,6 +194,7 @@ func TestFetchUseFallback(t *testing.T) {
time.Second,
fallbackFetcher,
apmtest.NewRecordingTracer().Tracer,
noop.NewMeterProvider(),
)

fetcher.refreshCache(context.Background())
Expand All @@ -208,6 +210,7 @@ func TestFetchNoFallbackInvalidESCfg(t *testing.T) {
time.Second,
nil,
apmtest.NewRecordingTracer().Tracer,
noop.NewMeterProvider(),
)

err := fetcher.refreshCache(context.Background())
Expand All @@ -224,6 +227,7 @@ func TestFetchNoFallback(t *testing.T) {
time.Second,
nil,
apmtest.NewRecordingTracer().Tracer,
noop.NewMeterProvider(),
)

err := fetcher.refreshCache(context.Background())
Expand Down
34 changes: 34 additions & 0 deletions internal/beatcmd/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,22 @@ func (b *Beat) registerStatsMetrics() {
}
}
})
monitoring.NewFunc(monitoring.Default, "apm-server", func(_ monitoring.Mode, v monitoring.Visitor) {
var rm metricdata.ResourceMetrics
if err := b.metricReader.Collect(context.Background(), &rm); err != nil {
return
}
v.OnRegistryStart()
defer v.OnRegistryFinished()
for _, sm := range rm.ScopeMetrics {
switch {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why to use switch case here? do we expect to handle more cases in the future?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see above comment, it's for consistency

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aligning all of these monitoring functions to use if statements also sounds fine, I was mostly addressing the inconsistency for the same kind of checks.

case strings.HasPrefix(sm.Scope.Name, "github.com/elastic/apm-server"):
// All simple scalar metrics that begin with the name "apm-server."
// in github.com/elastic/apm-server/... scopes are mapped directly.
addAPMServerMetrics(v, sm)
}
}
})
}

// getScalarInt64 returns a single-value, dimensionless
Expand All @@ -560,6 +576,24 @@ func getScalarInt64(data metricdata.Aggregation) (int64, bool) {
return 0, false
}

func addAPMServerMetrics(v monitoring.Visitor, sm metricdata.ScopeMetrics) {
for _, m := range sm.Metrics {
if suffix, ok := strings.CutPrefix(m.Name, "apm-server."); ok {
if value, ok := getScalarInt64(m.Data); ok {
keys := strings.Split(suffix, ".")
for i := 0; i < len(keys)-1; i++ {
v.OnRegistryStart()
v.OnKey(keys[i])
}
monitoring.ReportInt(v, keys[len(keys)-1], value)
for i := 0; i < len(keys)-1; i++ {
v.OnRegistryFinished()
}
}
}
}
}

// Adapt go-docappender's OTel metrics to beats stack monitoring metrics,
// with a mixture of libbeat-specific and apm-server specific metric names.
func addDocappenderLibbeatOutputMetrics(ctx context.Context, v monitoring.Visitor, sm metricdata.ScopeMetrics) {
Expand Down
8 changes: 5 additions & 3 deletions internal/beatcmd/reloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,11 @@ func (r *Reloader) reload(inputConfig, outputConfig, apmTracingConfig *config.C)
// allow the runner to perform initialisations that must run
// synchronously.
newRunner, err := r.newRunner(RunnerParams{
Config: mergedConfig,
Info: r.info,
Logger: r.logger,
Config: mergedConfig,
Info: r.info,
Logger: r.logger,
MeterProvider: r.meterProvider,
MetricsGatherer: r.metricGatherer,
})
if err != nil {
return err
Expand Down
6 changes: 0 additions & 6 deletions internal/beater/api/config/agent/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import (

"github.com/pkg/errors"

"github.com/elastic/elastic-agent-libs/monitoring"

"github.com/elastic/apm-server/internal/agentcfg"
"github.com/elastic/apm-server/internal/beater/auth"
"github.com/elastic/apm-server/internal/beater/headers"
Expand All @@ -43,10 +41,6 @@ const (
)

var (
// MonitoringMap holds a mapping for request.IDs to monitoring counters
MonitoringMap = request.DefaultMonitoringMapForRegistry(registry)
registry = monitoring.Default.NewRegistry("apm-server.acm")

errCacheControl = fmt.Sprintf("max-age=%v, must-revalidate", errMaxAgeDuration.Seconds())
)

Expand Down
25 changes: 11 additions & 14 deletions internal/beater/api/intake/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
package intake

import (
"context"
"errors"
"fmt"
"net/http"
"strings"

"github.com/elastic/elastic-agent-libs/monitoring"
"go.opentelemetry.io/otel/metric"

"github.com/elastic/apm-data/input/elasticapm"
"github.com/elastic/apm-data/model/modelpb"
Expand All @@ -39,15 +40,6 @@ const (
)

var (
// MonitoringMap holds a mapping for request.IDs to monitoring counters
MonitoringMap = request.DefaultMonitoringMapForRegistry(registry)
registry = monitoring.Default.NewRegistry("apm-server.server")

streamRegistry = monitoring.Default.NewRegistry("apm-server.processor.stream")
eventsAccepted = monitoring.NewInt(streamRegistry, "accepted")
eventsInvalid = monitoring.NewInt(streamRegistry, "errors.invalid")
eventsTooLarge = monitoring.NewInt(streamRegistry, "errors.toolarge")

errMethodNotAllowed = errors.New("only POST requests are supported")
errServerShuttingDown = errors.New("server is shutting down")
errInvalidContentType = errors.New("invalid content type")
Expand All @@ -59,7 +51,12 @@ var (
type RequestMetadataFunc func(*request.Context) *modelpb.APMEvent

// Handler returns a request.Handler for managing intake requests for backend and rum events.
func Handler(handler elasticapm.StreamHandler, requestMetadataFunc RequestMetadataFunc, batchProcessor modelpb.BatchProcessor) request.Handler {
func Handler(mp metric.MeterProvider, handler elasticapm.StreamHandler, requestMetadataFunc RequestMetadataFunc, batchProcessor modelpb.BatchProcessor) request.Handler {
meter := mp.Meter("github.com/elastic/apm-server/internal/beater/api/intake")
eventsAccepted, _ := meter.Int64Counter("apm-server.processor.stream.accepted")
eventsInvalid, _ := meter.Int64Counter("apm-server.processor.stream.errors.invalid")
eventsTooLarge, _ := meter.Int64Counter("apm-server.processor.stream.errors.toolarge")

return func(c *request.Context) {
if err := validateRequest(c); err != nil {
writeError(c, err)
Expand All @@ -82,9 +79,9 @@ func Handler(handler elasticapm.StreamHandler, requestMetadataFunc RequestMetada
batchProcessor,
&result,
)
eventsAccepted.Add(int64(result.Accepted))
eventsInvalid.Add(int64(result.Invalid))
eventsTooLarge.Add(int64(result.TooLarge))
eventsAccepted.Add(context.Background(), int64(result.Accepted))
kruskall marked this conversation as resolved.
Show resolved Hide resolved
eventsInvalid.Add(context.Background(), int64(result.Invalid))
eventsTooLarge.Add(context.Background(), int64(result.TooLarge))
writeStreamResult(c, result, err)
}
}
Expand Down
29 changes: 19 additions & 10 deletions internal/beater/api/intake/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,16 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/metric/noop"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"golang.org/x/sync/semaphore"

"github.com/elastic/apm-data/input/elasticapm"
"github.com/elastic/apm-data/model/modelpb"
"github.com/elastic/apm-server/internal/beater/config"
"github.com/elastic/apm-server/internal/beater/headers"
"github.com/elastic/apm-server/internal/beater/monitoringtest"
"github.com/elastic/apm-server/internal/beater/request"
"github.com/elastic/apm-server/internal/model/modelprocessor"
"github.com/elastic/apm-server/internal/publish"
Expand Down Expand Up @@ -141,7 +145,7 @@ func TestIntakeHandler(t *testing.T) {
tc.setup(t)

// call handler
h := Handler(tc.processor, emptyRequestMetadata, tc.batchProcessor)
h := Handler(noop.NewMeterProvider(), tc.processor, emptyRequestMetadata, tc.batchProcessor)
h(tc.c)

require.Equal(t, string(tc.id), string(tc.c.Result.ID))
Expand All @@ -163,10 +167,6 @@ func TestIntakeHandler(t *testing.T) {
}

func TestIntakeHandlerMonitoring(t *testing.T) {
eventsAccepted.Set(1)
eventsInvalid.Set(2)
eventsTooLarge.Set(3)

streamHandler := streamHandlerFunc(func(
ctx context.Context,
base *modelpb.APMEvent,
Expand All @@ -181,15 +181,24 @@ func TestIntakeHandlerMonitoring(t *testing.T) {
return errors.New("something bad happened at the end")
})

h := Handler(streamHandler, emptyRequestMetadata, modelprocessor.Nop{})
reader := sdkmetric.NewManualReader(sdkmetric.WithTemporalitySelector(
func(ik sdkmetric.InstrumentKind) metricdata.Temporality {
return metricdata.DeltaTemporality
},
))
mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))

h := Handler(mp, streamHandler, emptyRequestMetadata, modelprocessor.Nop{})
req := httptest.NewRequest("POST", "/", nil)
c := request.NewContext()
c.Reset(httptest.NewRecorder(), req)
h(c)

assert.Equal(t, int64(11), eventsAccepted.Get())
assert.Equal(t, int64(102), eventsInvalid.Get())
assert.Equal(t, int64(1003), eventsTooLarge.Get())
monitoringtest.ExpectOtelMetrics(t, reader, map[string]any{
"apm-server.processor.stream.accepted": 10,
"apm-server.processor.stream.errors.invalid": 100,
"apm-server.processor.stream.errors.toolarge": 1000,
})
}

func TestIntakeHandlerContentType(t *testing.T) {
Expand All @@ -206,7 +215,7 @@ func TestIntakeHandlerContentType(t *testing.T) {
}

tc.setup(t)
h := Handler(tc.processor, emptyRequestMetadata, tc.batchProcessor)
h := Handler(noop.NewMeterProvider(), tc.processor, emptyRequestMetadata, tc.batchProcessor)
h(tc.c)
assert.Equal(t, tc.code, tc.w.Code, tc.c.Result.Err)
}
Expand Down
Loading
Loading