From 91645514597b0dcff19b14d6344934b60c23764c Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Wed, 8 May 2024 18:28:07 +0100 Subject: [PATCH] Add configuration option for remapping otel metrics --- input/otlp/consumer.go | 18 +++++++++++++----- input/otlp/hostmetrics/memory.go | 8 +++++--- input/otlp/metrics.go | 14 ++++++++------ 3 files changed, 26 insertions(+), 14 deletions(-) diff --git a/input/otlp/consumer.go b/input/otlp/consumer.go index 9716c6fb..b6fea898 100644 --- a/input/otlp/consumer.go +++ b/input/otlp/consumer.go @@ -46,6 +46,10 @@ type ConsumerConfig struct { // Semaphore holds a semaphore on which Processor.HandleStream will acquire a // token before proceeding, to limit concurrency. Semaphore input.Semaphore + + // RemapOTelMetrics remaps certain OpenTelemetry metrics to elastic metrics. + // Note that both, OTel and Elastic, metrics would be published. + RemapOTelMetrics bool } // Consumer transforms OpenTelemetry data to the Elastic APM data model, @@ -64,12 +68,16 @@ func NewConsumer(config ConsumerConfig) *Consumer { } else { config.Logger = config.Logger.Named("otel") } - return &Consumer{ - config: config, - sem: config.Semaphore, - remappers: []remapper{ + var remappers []remapper + if config.RemapOTelMetrics { + remappers = []remapper{ hostmetrics.NewRemapper(config.Logger), - }, + } + } + return &Consumer{ + config: config, + sem: config.Semaphore, + remappers: remappers, } } diff --git a/input/otlp/hostmetrics/memory.go b/input/otlp/hostmetrics/memory.go index a080823d..1f18a93c 100644 --- a/input/otlp/hostmetrics/memory.go +++ b/input/otlp/hostmetrics/memory.go @@ -10,10 +10,12 @@ func remapMemoryMetrics(src, out pmetric.MetricSlice, dataset string) error { var total, free, cached, usedBytes, actualFree, actualUsedBytes int64 var usedPercent, actualUsedPercent float64 - // iterate all metrics in the current scope and generate the additional Elastic system integration metrics + // iterate all metrics in the current scope and generate the additional Elastic system + // integration metrics for i := 0; i < src.Len(); i++ { metric := src.At(i) - // TODO (lahisvjar): If system.memory.utilization is not present then derive it from system.memory.usage + // TODO (lahisvjar): If system.memory.utilization is not present then derive it from + // system.memory.usage. if metric.Name() == "system.memory.usage" { dataPoints := metric.Sum().DataPoints() for j := 0; j < dataPoints.Len(); j++ { @@ -29,7 +31,7 @@ func remapMemoryMetrics(src, out pmetric.MetricSlice, dataset string) error { total += value case "free": // TODO (lahsivjar): if system.linux.memory.available is present then use it for - // free mem calculation + // free mem calculation. free = value usedBytes -= value total += value diff --git a/input/otlp/metrics.go b/input/otlp/metrics.go index e1fa3a9b..a7189c9d 100644 --- a/input/otlp/metrics.go +++ b/input/otlp/metrics.go @@ -146,12 +146,14 @@ func (c *Consumer) handleScopeMetrics( // Handle remapping if any. Remapped metrics will be added to a new // metric slice and then processed as any other metric in the scope. // TODO (lahsivjar): Possible to approximate capacity of the slice? - remappedMetrics := pmetric.NewMetricSlice() - for _, r := range c.remappers { - r.Remap(in, remappedMetrics) - } - for i := 0; i < remappedMetrics.Len(); i++ { - c.addMetric(remappedMetrics.At(i), ms, remainingDataPoints, remainingMetrics) + if len(c.remappers) > 0 { + remappedMetrics := pmetric.NewMetricSlice() + for _, r := range c.remappers { + r.Remap(in, remappedMetrics) + } + for i := 0; i < remappedMetrics.Len(); i++ { + c.addMetric(remappedMetrics.At(i), ms, remainingDataPoints, remainingMetrics) + } } // Process all the metrics added to the metricset. for key, ms := range ms {