Skip to content

Commit

Permalink
Remap otel hostmetrics to elastic metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
lahsivjar committed May 9, 2024
1 parent 9bbda71 commit c87ce4d
Show file tree
Hide file tree
Showing 8 changed files with 523 additions and 9 deletions.
17 changes: 14 additions & 3 deletions input/otlp/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,18 @@ import (
"sync/atomic"

"github.com/elastic/apm-data/input"
"github.com/elastic/apm-data/input/otlp/hostmetrics"
"github.com/elastic/apm-data/model/modelpb"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
)

type remapper interface {
Remap(pmetric.ScopeMetrics, pmetric.MetricSlice)
Valid(pmetric.ScopeMetrics) bool
}

// ConsumerConfig holds configuration for Consumer.
type ConsumerConfig struct {
// Logger holds a logger for the consumer. If this is nil, then
Expand All @@ -44,9 +51,10 @@ type ConsumerConfig struct {
// Consumer transforms OpenTelemetry data to the Elastic APM data model,
// sending each payload as a batch to the configured BatchProcessor.
type Consumer struct {
sem input.Semaphore
config ConsumerConfig
stats consumerStats
sem input.Semaphore
config ConsumerConfig
stats consumerStats
remappers []remapper
}

// NewConsumer returns a new Consumer with the given configuration.
Expand All @@ -59,6 +67,9 @@ func NewConsumer(config ConsumerConfig) *Consumer {
return &Consumer{
config: config,
sem: config.Semaphore,
remappers: []remapper{
hostmetrics.NewRemapper(config.Logger),
},
}
}

Expand Down
184 changes: 184 additions & 0 deletions input/otlp/hostmetrics/cpu.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package hostmetrics

import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
)

func remapCPUMetrics(src, out pmetric.MetricSlice, dataset string) {
var timestamp pcommon.Timestamp
var numCores int64
var totalPercent, idlePercent, systemPercent, userPercent, stealPercent,
iowaitPercent, nicePercent, irqPercent, softirqPercent float64

// iterate all metrics in the current scope and generate the additional Elastic system integration metrics
for i := 0; i < src.Len(); i++ {
metric := src.At(i)
if metric.Name() == "system.cpu.logical.count" {
dp := metric.Sum().DataPoints().At(0)
numCores = dp.IntValue()
timestamp = dp.Timestamp()
} else if metric.Name() == "system.cpu.utilization" {
dataPoints := metric.Gauge().DataPoints()
for j := 0; j < dataPoints.Len(); j++ {
dp := dataPoints.At(j)
value := dp.DoubleValue()

if state, ok := dp.Attributes().Get("state"); ok {
switch state.Str() {
case "idle":
idlePercent += value
case "system":
systemPercent += value
totalPercent += value
case "user":
userPercent += value
totalPercent += value
case "steal":
stealPercent += value
totalPercent += value
case "wait":
iowaitPercent += value
totalPercent += value
case "nice":
nicePercent += value
totalPercent += value
case "interrupt":
irqPercent += value
totalPercent += value
case "softirq":
softirqPercent += value
totalPercent += value
}
}
}
}
}

totalNorm := totalPercent / float64(numCores)
idleNorm := idlePercent / float64(numCores)
systemNorm := systemPercent / float64(numCores)
userNorm := userPercent / float64(numCores)
stealNorm := stealPercent / float64(numCores)
iowaitNorm := iowaitPercent / float64(numCores)
niceNorm := nicePercent / float64(numCores)
irqNorm := irqPercent / float64(numCores)
softirqNorm := softirqPercent / float64(numCores)

addMetrics(out, dataset,
metric{
dataType: pmetric.MetricTypeSum,
name: "system.cpu.cores",
timestamp: timestamp,
intValue: &numCores,
},
metric{
dataType: pmetric.MetricTypeGauge,
name: "system.cpu.total.pct",
timestamp: timestamp,
doubleValue: &totalPercent,
},
metric{
dataType: pmetric.MetricTypeGauge,
name: "system.cpu.idle.pct",
timestamp: timestamp,
doubleValue: &idlePercent,
},
metric{
dataType: pmetric.MetricTypeGauge,
name: "system.cpu.system.pct",
timestamp: timestamp,
doubleValue: &systemPercent,
},
metric{
dataType: pmetric.MetricTypeGauge,
name: "system.cpu.user.pct",
timestamp: timestamp,
doubleValue: &userPercent,
},
metric{
dataType: pmetric.MetricTypeGauge,
name: "system.cpu.steal.pct",
timestamp: timestamp,
doubleValue: &stealPercent,
},
metric{
dataType: pmetric.MetricTypeGauge,
name: "system.cpu.wait.pct",
timestamp: timestamp,
doubleValue: &iowaitPercent,
},
metric{
dataType: pmetric.MetricTypeGauge,
name: "system.cpu.nice.pct",
timestamp: timestamp,
doubleValue: &nicePercent,
},
metric{
dataType: pmetric.MetricTypeGauge,
name: "system.cpu.irq.pct",
timestamp: timestamp,
doubleValue: &irqPercent,
},
metric{
dataType: pmetric.MetricTypeGauge,
name: "system.cpu.softirq.pct",
timestamp: timestamp,
doubleValue: &softirqPercent,
},
metric{
dataType: pmetric.MetricTypeGauge,
name: "system.cpu.total.norm.pct",
timestamp: timestamp,
doubleValue: &totalNorm,
},
metric{
dataType: pmetric.MetricTypeGauge,
name: "system.cpu.idle.norm.pct",
timestamp: timestamp,
doubleValue: &idleNorm,
},
metric{
dataType: pmetric.MetricTypeGauge,
name: "system.cpu.system.norm.pct",
timestamp: timestamp,
doubleValue: &systemNorm,
},
metric{
dataType: pmetric.MetricTypeGauge,
name: "system.cpu.user.norm.pct",
timestamp: timestamp,
doubleValue: &userNorm,
},
metric{
dataType: pmetric.MetricTypeGauge,
name: "system.cpu.steal.norm.pct",
timestamp: timestamp,
doubleValue: &stealNorm,
},
metric{
dataType: pmetric.MetricTypeGauge,
name: "system.cpu.wait.norm.pct",
timestamp: timestamp,
doubleValue: &iowaitNorm,
},
metric{
dataType: pmetric.MetricTypeGauge,
name: "system.cpu.nice.norm.pct",
timestamp: timestamp,
doubleValue: &niceNorm,
},
metric{
dataType: pmetric.MetricTypeGauge,
name: "system.cpu.irq.norm.pct",
timestamp: timestamp,
doubleValue: &irqNorm,
},
metric{
dataType: pmetric.MetricTypeGauge,
name: "system.cpu.softirq.norm.pct",
timestamp: timestamp,
doubleValue: &softirqNorm,
},
)
}
63 changes: 63 additions & 0 deletions input/otlp/hostmetrics/hostmetrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package hostmetrics

import (
"path"
"strings"

"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
)

var scraperToElasticDataset = map[string]string{
"cpu": "system.cpu",
"disk": "system.diskio",
"filesystem": "system.filesystem",
"load": "system.load",
"memory": "system.memory",
"network": "system.network",
"paging": "system.memory",
"processes": "system.process",
"process": "system.process",
}

type Remapper struct {
logger *zap.Logger
}

func NewRemapper(logger *zap.Logger) *Remapper {
return &Remapper{
logger: logger,
}
}

func (r *Remapper) Remap(src pmetric.ScopeMetrics, out pmetric.MetricSlice) {
if !r.Valid(src) {
return
}

scope := src.Scope()
scraper := path.Base(scope.Name())

dataset, ok := scraperToElasticDataset[scraper]
if !ok {
r.logger.Warn("no dataset defined for scraper", zap.String("scraper", scraper))
return
}
metrics := src.Metrics()
switch scraper {
case "cpu":
remapCPUMetrics(metrics, out, dataset)
case "memory":
remapMemoryMetrics(metrics, out, dataset)
case "load":
remapLoadMetrics(metrics, out, dataset)
case "process":
remapProcessMetrics(metrics, out, dataset)
default:
r.logger.Warn("no remapper found for scope", zap.String("scope", scope.Name()))
}
}

func (r *Remapper) Valid(sm pmetric.ScopeMetrics) bool {
return strings.HasPrefix(sm.Scope().Name(), "otelcol/hostmetricsreceiver")
}
47 changes: 47 additions & 0 deletions input/otlp/hostmetrics/load.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package hostmetrics

import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
)

func remapLoadMetrics(src, out pmetric.MetricSlice, dataset string) {
var timestamp pcommon.Timestamp
var l1, l5, l15 float64

for i := 0; i < src.Len(); i++ {
metric := src.At(i)
if metric.Name() == "system.cpu.load_average.1m" {
dp := metric.Gauge().DataPoints().At(0)
timestamp = dp.Timestamp()
l1 = dp.DoubleValue()
} else if metric.Name() == "system.cpu.load_average.5m" {
dp := metric.Gauge().DataPoints().At(0)
l5 = dp.DoubleValue()
} else if metric.Name() == "system.cpu.load_average.15m" {
dp := metric.Gauge().DataPoints().At(0)
l15 = dp.DoubleValue()
}
}

addMetrics(out, dataset,
metric{
dataType: pmetric.MetricTypeGauge,
name: "system.load.1",
timestamp: timestamp,
doubleValue: &l1,
},
metric{
dataType: pmetric.MetricTypeGauge,
name: "system.load.5",
timestamp: timestamp,
doubleValue: &l5,
},
metric{
dataType: pmetric.MetricTypeGauge,
name: "system.load.15",
timestamp: timestamp,
doubleValue: &l15,
},
)
}
Loading

0 comments on commit c87ce4d

Please sign in to comment.