Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remap OTel hostmetrics to Elastic metrics #277

Merged
merged 5 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
module github.com/elastic/apm-data

go 1.21
go 1.21.1

require (
github.com/elastic/opentelemetry-lib v0.0.0-20240604140721-08863a456d6c
github.com/google/go-cmp v0.6.0
github.com/jaegertracing/jaeger v1.56.0
github.com/json-iterator/go v1.1.12
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ github.com/elastic/go-sysinfo v1.7.1/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6
github.com/elastic/go-windows v1.0.0/go.mod h1:TsU0Nrp7/y3+VwE82FoZF8gC/XFg/Elz6CcloAxnPgU=
github.com/elastic/go-windows v1.0.1 h1:AlYZOldA+UJ0/2nBuqWdo90GFCgG9xuyw9SYzGUtJm0=
github.com/elastic/go-windows v1.0.1/go.mod h1:FoVvqWSun28vaDQPbj2Elfc0JahhPB7WQEGa3c814Ss=
github.com/elastic/opentelemetry-lib v0.0.0-20240604140721-08863a456d6c h1:gRCjiqyIH1RxEYAJr1z9Y3KdJITip45iPprj7DbL9Mk=
github.com/elastic/opentelemetry-lib v0.0.0-20240604140721-08863a456d6c/go.mod h1:/kKvHbJLVo/NcKMPHI8/RZKL64fushmnRUzn+arQpjg=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
Expand Down
29 changes: 24 additions & 5 deletions input/otlp/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,17 @@ import (

"github.com/elastic/apm-data/input"
"github.com/elastic/apm-data/model/modelpb"
"github.com/elastic/opentelemetry-lib/remappers/hostmetrics"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
)

type remapper interface {
Remap(pmetric.ScopeMetrics, pmetric.MetricSlice, pcommon.Resource)
}

// ConsumerConfig holds configuration for Consumer.
type ConsumerConfig struct {
// Logger holds a logger for the consumer. If this is nil, then
Expand All @@ -39,14 +46,19 @@ type ConsumerConfig struct {
// Semaphore holds a semaphore on which Processor.HandleStream will acquire a
// token before proceeding, to limit concurrency.
Semaphore input.Semaphore

// RemapOTelMetrics remaps certain OpenTelemetry metrics to elastic metrics.
// Note that both, OTel and Elastic, metrics would be published.
RemapOTelMetrics bool
}

// Consumer transforms OpenTelemetry data to the Elastic APM data model,
// sending each payload as a batch to the configured BatchProcessor.
type Consumer struct {
sem input.Semaphore
config ConsumerConfig
stats consumerStats
config ConsumerConfig
sem input.Semaphore
remappers []remapper
stats consumerStats
}

// NewConsumer returns a new Consumer with the given configuration.
Expand All @@ -56,9 +68,16 @@ func NewConsumer(config ConsumerConfig) *Consumer {
} else {
config.Logger = config.Logger.Named("otel")
}
var remappers []remapper
if config.RemapOTelMetrics {
remappers = []remapper{
hostmetrics.NewRemapper(config.Logger),
}
}
return &Consumer{
config: config,
sem: config.Semaphore,
config: config,
sem: config.Semaphore,
remappers: remappers,
}
}

Expand Down
47 changes: 41 additions & 6 deletions input/otlp/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (c *Consumer) ConsumeMetricsWithResult(ctx context.Context, metrics pmetric
remainingMetrics := totalMetrics
receiveTimestamp := time.Now()
c.config.Logger.Debug("consuming metrics", zap.Stringer("metrics", metricsStringer(metrics)))
batch := c.convertMetrics(metrics, receiveTimestamp, &remainingDataPoints, &remainingMetrics)
batch := c.handleMetrics(metrics, receiveTimestamp, &remainingDataPoints, &remainingMetrics)
if remainingMetrics > 0 {
// Some metrics remained after conversion, meaning that they were dropped.
atomic.AddInt64(&c.stats.unsupportedMetricsDropped, remainingMetrics)
Expand All @@ -93,20 +93,20 @@ func (c *Consumer) ConsumeMetricsWithResult(ctx context.Context, metrics pmetric
}, nil
}

func (c *Consumer) convertMetrics(
func (c *Consumer) handleMetrics(
metrics pmetric.Metrics,
receiveTimestamp time.Time,
remainingDataPoints, remainingMetrics *int64,
) (batch *modelpb.Batch) {
batch = &modelpb.Batch{}
resourceMetrics := metrics.ResourceMetrics()
for i := 0; i < resourceMetrics.Len(); i++ {
c.convertResourceMetrics(resourceMetrics.At(i), receiveTimestamp, batch, remainingDataPoints, remainingMetrics)
c.handleResourceMetrics(resourceMetrics.At(i), receiveTimestamp, batch, remainingDataPoints, remainingMetrics)
}
return
}

func (c *Consumer) convertResourceMetrics(
func (c *Consumer) handleResourceMetrics(
resourceMetrics pmetric.ResourceMetrics,
receiveTimestamp time.Time,
out *modelpb.Batch,
Expand All @@ -125,23 +125,39 @@ func (c *Consumer) convertResourceMetrics(
}
scopeMetrics := resourceMetrics.ScopeMetrics()
for i := 0; i < scopeMetrics.Len(); i++ {
c.convertScopeMetrics(scopeMetrics.At(i), baseEvent, timeDelta, out, remainingDataPoints, remainingMetrics)
c.handleScopeMetrics(scopeMetrics.At(i), resource, baseEvent, timeDelta, out, remainingDataPoints, remainingMetrics)
}
return
}

func (c *Consumer) convertScopeMetrics(
func (c *Consumer) handleScopeMetrics(
in pmetric.ScopeMetrics,
resource pcommon.Resource,
baseEvent *modelpb.APMEvent,
timeDelta time.Duration,
out *modelpb.Batch,
remainingDataPoints, remainingMetrics *int64,
) {
ms := make(metricsets)
// Add the original otel metrics to the metricset.
otelMetrics := in.Metrics()
for i := 0; i < otelMetrics.Len(); i++ {
c.addMetric(otelMetrics.At(i), ms, remainingDataPoints, remainingMetrics)
}
// Handle remapping if any. Remapped metrics will be added to a new
// metric slice and then processed as any other metric in the scope.
if len(c.remappers) > 0 {
remappedMetrics := pmetric.NewMetricSlice()
for _, r := range c.remappers {
r.Remap(in, remappedMetrics, resource)
}
*remainingDataPoints += int64(dataPointsCount(remappedMetrics))
*remainingMetrics += int64(remappedMetrics.Len())
for i := 0; i < remappedMetrics.Len(); i++ {
c.addMetric(remappedMetrics.At(i), ms, remainingDataPoints, remainingMetrics)
}
}
// Process all the metrics added to the metricset.
for key, ms := range ms {
event := baseEvent.CloneVT()

Expand Down Expand Up @@ -378,3 +394,22 @@ func (ms metricsets) upsertOne(timestamp time.Time, attributes pcommon.Map, samp
}
m.samples[sample.Name] = sample
}

func dataPointsCount(ms pmetric.MetricSlice) (count int) {
for i := 0; i < ms.Len(); i++ {
m := ms.At(i)
switch m.Type() {
case pmetric.MetricTypeGauge:
count += m.Gauge().DataPoints().Len()
case pmetric.MetricTypeSum:
count += m.Sum().DataPoints().Len()
case pmetric.MetricTypeHistogram:
count += m.Histogram().DataPoints().Len()
case pmetric.MetricTypeExponentialHistogram:
count += m.ExponentialHistogram().DataPoints().Len()
case pmetric.MetricTypeSummary:
count += m.Summary().DataPoints().Len()
}
}
return
}
83 changes: 81 additions & 2 deletions input/otlp/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (

"github.com/elastic/apm-data/input/otlp"
"github.com/elastic/apm-data/model/modelpb"
"github.com/elastic/opentelemetry-lib/remappers/common"
)

func TestConsumer_ConsumeMetrics_Interface(t *testing.T) {
Expand Down Expand Up @@ -851,6 +852,83 @@ func TestConsumeMetricsDataStream(t *testing.T) {
}
}

// The below test asserts that remapped metrics are correctly processed by the
// code but does not test the correctness of the remapping libarary or if all
// the metrics are remapped as it is within the scope of the remapping library.
func TestConsumeMetricsWithOTelRemapper(t *testing.T) {
metrics := pmetric.NewMetrics()
rm := metrics.ResourceMetrics().AppendEmpty()
sm := rm.ScopeMetrics().AppendEmpty()
ms := sm.Metrics()

// Configure scope for hostmetrics receiver
sm.Scope().SetName("otelcol/hostmetricsreceiver/load")

// Add a datapoint for a metric produced by hostmetrics receiver
ts := time.Now().UTC()
metric := ms.AppendEmpty()
metric.SetName("system.cpu.load_average.1m")
dp := metric.SetEmptyGauge().DataPoints().AppendEmpty()
dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
dp.SetDoubleValue(0.7)

events, stats, results, err := transformMetrics(t, metrics)
assert.NoError(t, err)

expected := []*modelpb.APMEvent{
{
Service: &modelpb.Service{
Name: "unknown",
Language: &modelpb.Language{Name: "unknown"},
},
Agent: &modelpb.Agent{Name: "otlp", Version: "unknown"},
Timestamp: modelpb.FromTime(ts),
Metricset: &modelpb.Metricset{
Name: "app",
Samples: []*modelpb.MetricsetSample{
{
Name: "system.cpu.load_average.1m",
Type: modelpb.MetricType_METRIC_TYPE_GAUGE,
Value: 0.7,
},
},
},
},
{
Service: &modelpb.Service{
Name: "unknown",
Language: &modelpb.Language{Name: "unknown"},
},
Agent: &modelpb.Agent{Name: "otlp", Version: "unknown"},
Timestamp: modelpb.FromTime(ts),
Metricset: &modelpb.Metricset{
Name: "app",
Samples: []*modelpb.MetricsetSample{
{
Name: "system.load.1",
Type: modelpb.MetricType_METRIC_TYPE_GAUGE,
Value: 0.7,
},
{
Name: "system.load.5",
Type: modelpb.MetricType_METRIC_TYPE_GAUGE,
},
{
Name: "system.load.15",
Type: modelpb.MetricType_METRIC_TYPE_GAUGE,
},
},
},
Labels: map[string]*modelpb.LabelValue{
"event.module": &modelpb.LabelValue{Value: common.RemapperEventModule},
},
},
}
eventsMatch(t, expected, events)
assert.Equal(t, otlp.ConsumerStats{}, stats)
assert.Equal(t, otlp.ConsumeMetricsResult{}, results)
}

/* TODO
func TestMetricsLogging(t *testing.T) {
for _, level := range []logp.Level{logp.InfoLevel, logp.DebugLevel} {
Expand All @@ -873,8 +951,9 @@ func transformMetrics(t *testing.T, metrics pmetric.Metrics) ([]*modelpb.APMEven
recorder := batchRecorderBatchProcessor(&batches)

consumer := otlp.NewConsumer(otlp.ConsumerConfig{
Processor: recorder,
Semaphore: semaphore.NewWeighted(100),
Processor: recorder,
Semaphore: semaphore.NewWeighted(100),
RemapOTelMetrics: true,
})
result, err := consumer.ConsumeMetricsWithResult(context.Background(), metrics)
require.Len(t, batches, 1)
Expand Down
17 changes: 13 additions & 4 deletions model/modelprocessor/datastream.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strings"

"github.com/elastic/apm-data/model/modelpb"
"github.com/elastic/opentelemetry-lib/remappers/common"
)

const (
Expand Down Expand Up @@ -141,10 +142,18 @@ func metricsetDataset(event *modelpb.APMEvent) string {
// metrics that we don't already know about; otherwise they will end
// up creating service-specific data streams.
internal := true
for _, s := range event.Metricset.Samples {
if !IsInternalMetricName(s.Name) {
internal = false
break

// set internal to false for metrics translated using OTel remappers.
lahsivjar marked this conversation as resolved.
Show resolved Hide resolved
if label, ok := event.Labels["event.module"]; ok && label != nil {
internal = !(label.Value == common.RemapperEventModule)
}

if internal {
for _, s := range event.Metricset.Samples {
if !IsInternalMetricName(s.Name) {
internal = false
break
}
}
}
if internal {
Expand Down
29 changes: 29 additions & 0 deletions model/modelprocessor/datastream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/elastic/apm-data/model/modelpb"
"github.com/elastic/apm-data/model/modelprocessor"
"github.com/elastic/opentelemetry-lib/remappers/common"
)

func TestSetDataStream(t *testing.T) {
Expand Down Expand Up @@ -196,6 +197,34 @@ func TestSetDataStream(t *testing.T) {
},
},
output: &modelpb.DataStream{Type: "metrics", Dataset: "apm.internal", Namespace: "custom"},
}, {
input: &modelpb.APMEvent{
Agent: &modelpb.Agent{Name: "otel"},
Service: &modelpb.Service{Name: "service-name"},
Metricset: &modelpb.Metricset{
Samples: []*modelpb.MetricsetSample{
{Name: "system.memory.total"},
},
},
Labels: map[string]*modelpb.LabelValue{
"event.module": &modelpb.LabelValue{Value: common.RemapperEventModule}, // otel translated hostmetrics
},
},
output: &modelpb.DataStream{Type: "metrics", Dataset: "apm.app.service_name", Namespace: "custom"},
}, {
input: &modelpb.APMEvent{
Agent: &modelpb.Agent{Name: "otel"},
Service: &modelpb.Service{Name: "service-name"},
Metricset: &modelpb.Metricset{
Samples: []*modelpb.MetricsetSample{
{Name: "system.memory.total"},
},
},
Labels: map[string]*modelpb.LabelValue{
"event.provider": &modelpb.LabelValue{Value: "kernel"},
},
},
output: &modelpb.DataStream{Type: "metrics", Dataset: "apm.internal", Namespace: "custom"},
}}

for _, test := range tests {
Expand Down