Skip to content

Commit

Permalink
fix: copy prometheusexporter from otel collector contrib repo (#84)
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-mwangi authored Jul 14, 2022
1 parent 1a39dd5 commit 9696174
Show file tree
Hide file tree
Showing 23 changed files with 4,274 additions and 2 deletions.
1 change: 1 addition & 0 deletions exporter/prometheusexporter/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
48 changes: 48 additions & 0 deletions exporter/prometheusexporter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Prometheus Exporter

**IMPORTANT:** This component is copied from https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.54.0/exporter/prometheusexporter and
adapted to avoid logging prometheus metrics errors that were not previously logged.

| Status | |
| ------------------------ |-------------------|
| Stability | [beta] |
| Supported pipeline types | metrics |
| Distributions | [core], [contrib] |

Exports data in the [Prometheus format](https://prometheus.io/docs/concepts/data_model/), which allows it to be scraped by a [Prometheus](https://prometheus.io/) server.

## Getting Started

The following settings are required:

- `endpoint` (no default): the address on which metrics will be exposed by the Prometheus scrape handler.

The following settings can be optionally configured:

- `const_labels` (no default): key/values that are applied for every exported metric.
- `namespace` (no default): if set, exports metrics under the provided value.
- `send_timestamps` (default = `false`): if true, sends the timestamp of the underlying
metric sample in the response.
- `metric_expiration` (default = `5m`): defines how long metrics are exposed without updates
- `resource_to_telemetry_conversion`
- `enabled` (default = false): If `enabled` is `true`, all the resource attributes will be converted to metric labels by default.

Example:

```yaml
exporters:
prometheus:
endpoint: "1.2.3.4:1234"
namespace: test-space
const_labels:
label1: value1
"another label": spaced value
send_timestamps: true
metric_expiration: 180m
resource_to_telemetry_conversion:
enabled: true
```
[beta]:https://github.com/open-telemetry/opentelemetry-collector#beta
[contrib]:https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
[core]:https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol
319 changes: 319 additions & 0 deletions exporter/prometheusexporter/accumulator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,319 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package prometheusexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusexporter"

import (
"fmt"
"strings"
"sync"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
)

type accumulatedValue struct {
// value contains a metric with exactly one aggregated datapoint.
value pmetric.Metric

// resourceAttrs contain the resource attributes. They are used to output instance and job labels.
resourceAttrs pcommon.Map

// updated indicates when metric was last changed.
updated time.Time

scope pcommon.InstrumentationScope
}

// accumulator stores aggragated values of incoming metrics
type accumulator interface {
// Accumulate stores aggragated metric values
Accumulate(resourceMetrics pmetric.ResourceMetrics) (processed int)
// Collect returns a slice with relevant aggregated metrics and their resource attributes.
// The number or metrics and attributes returned will be the same.
Collect() (metrics []pmetric.Metric, resourceAttrs []pcommon.Map)
}

// LastValueAccumulator keeps last value for accumulated metrics
type lastValueAccumulator struct {
logger *zap.Logger

registeredMetrics sync.Map

// metricExpiration contains duration for which metric
// should be served after it was updated
metricExpiration time.Duration
}

// NewAccumulator returns LastValueAccumulator
func newAccumulator(logger *zap.Logger, metricExpiration time.Duration) accumulator {
return &lastValueAccumulator{
logger: logger,
metricExpiration: metricExpiration,
}
}

// Accumulate stores one datapoint per metric
func (a *lastValueAccumulator) Accumulate(rm pmetric.ResourceMetrics) (n int) {
now := time.Now()
ilms := rm.ScopeMetrics()
resourceAttrs := rm.Resource().Attributes()

for i := 0; i < ilms.Len(); i++ {
ilm := ilms.At(i)

metrics := ilm.Metrics()
for j := 0; j < metrics.Len(); j++ {
n += a.addMetric(metrics.At(j), ilm.Scope(), resourceAttrs, now)
}
}

return
}

func (a *lastValueAccumulator) addMetric(metric pmetric.Metric, il pcommon.InstrumentationScope, resourceAttrs pcommon.Map, now time.Time) int {
a.logger.Debug(fmt.Sprintf("accumulating metric: %s", metric.Name()))

switch metric.DataType() {
case pmetric.MetricDataTypeGauge:
return a.accumulateGauge(metric, il, resourceAttrs, now)
case pmetric.MetricDataTypeSum:
return a.accumulateSum(metric, il, resourceAttrs, now)
case pmetric.MetricDataTypeHistogram:
return a.accumulateDoubleHistogram(metric, il, resourceAttrs, now)
case pmetric.MetricDataTypeSummary:
return a.accumulateSummary(metric, il, resourceAttrs, now)
default:
a.logger.With(
zap.String("data_type", string(metric.DataType())),
zap.String("metric_name", metric.Name()),
).Error("failed to translate metric")
}

return 0
}

func (a *lastValueAccumulator) accumulateSummary(metric pmetric.Metric, il pcommon.InstrumentationScope, resourceAttrs pcommon.Map, now time.Time) (n int) {
dps := metric.Summary().DataPoints()
for i := 0; i < dps.Len(); i++ {
ip := dps.At(i)

signature := timeseriesSignature(il.Name(), metric, ip.Attributes())
if ip.Flags().HasFlag(pmetric.MetricDataPointFlagNoRecordedValue) {
a.registeredMetrics.Delete(signature)
return 0
}

v, ok := a.registeredMetrics.Load(signature)
stalePoint := ok &&
ip.Timestamp().AsTime().Before(v.(*accumulatedValue).value.Summary().DataPoints().At(0).Timestamp().AsTime())

if stalePoint {
// Only keep this datapoint if it has a later timestamp.
continue
}

mm := createMetric(metric)
ip.CopyTo(mm.Summary().DataPoints().AppendEmpty())
a.registeredMetrics.Store(signature, &accumulatedValue{value: mm, resourceAttrs: resourceAttrs, scope: il, updated: now})
n++
}

return n
}

func (a *lastValueAccumulator) accumulateGauge(metric pmetric.Metric, il pcommon.InstrumentationScope, resourceAttrs pcommon.Map, now time.Time) (n int) {
dps := metric.Gauge().DataPoints()
for i := 0; i < dps.Len(); i++ {
ip := dps.At(i)

signature := timeseriesSignature(il.Name(), metric, ip.Attributes())
if ip.Flags().HasFlag(pmetric.MetricDataPointFlagNoRecordedValue) {
a.registeredMetrics.Delete(signature)
return 0
}

v, ok := a.registeredMetrics.Load(signature)
if !ok {
m := createMetric(metric)
ip.CopyTo(m.Gauge().DataPoints().AppendEmpty())
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, resourceAttrs: resourceAttrs, scope: il, updated: now})
n++
continue
}
mv := v.(*accumulatedValue)

if ip.Timestamp().AsTime().Before(mv.value.Gauge().DataPoints().At(0).Timestamp().AsTime()) {
// only keep datapoint with latest timestamp
continue
}

m := createMetric(metric)
ip.CopyTo(m.Gauge().DataPoints().AppendEmpty())
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, resourceAttrs: resourceAttrs, scope: il, updated: now})
n++
}
return
}

func (a *lastValueAccumulator) accumulateSum(metric pmetric.Metric, il pcommon.InstrumentationScope, resourceAttrs pcommon.Map, now time.Time) (n int) {
doubleSum := metric.Sum()

// Drop metrics with unspecified aggregations
if doubleSum.AggregationTemporality() == pmetric.MetricAggregationTemporalityUnspecified {
return
}

// Drop non-monotonic and non-cumulative metrics
if doubleSum.AggregationTemporality() == pmetric.MetricAggregationTemporalityDelta && !doubleSum.IsMonotonic() {
return
}

dps := doubleSum.DataPoints()
for i := 0; i < dps.Len(); i++ {
ip := dps.At(i)

signature := timeseriesSignature(il.Name(), metric, ip.Attributes())
if ip.Flags().HasFlag(pmetric.MetricDataPointFlagNoRecordedValue) {
a.registeredMetrics.Delete(signature)
return 0
}

v, ok := a.registeredMetrics.Load(signature)
if !ok {
m := createMetric(metric)
m.Sum().SetIsMonotonic(metric.Sum().IsMonotonic())
m.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityCumulative)
ip.CopyTo(m.Sum().DataPoints().AppendEmpty())
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, resourceAttrs: resourceAttrs, scope: il, updated: now})
n++
continue
}
mv := v.(*accumulatedValue)

if ip.Timestamp().AsTime().Before(mv.value.Sum().DataPoints().At(0).Timestamp().AsTime()) {
// only keep datapoint with latest timestamp
continue
}

// Delta-to-Cumulative
if doubleSum.AggregationTemporality() == pmetric.MetricAggregationTemporalityDelta && ip.StartTimestamp() == mv.value.Sum().DataPoints().At(0).StartTimestamp() {
ip.SetStartTimestamp(mv.value.Sum().DataPoints().At(0).StartTimestamp())
switch ip.ValueType() {
case pmetric.NumberDataPointValueTypeInt:
ip.SetIntVal(ip.IntVal() + mv.value.Sum().DataPoints().At(0).IntVal())
case pmetric.NumberDataPointValueTypeDouble:
ip.SetDoubleVal(ip.DoubleVal() + mv.value.Sum().DataPoints().At(0).DoubleVal())
}
}

m := createMetric(metric)
m.Sum().SetIsMonotonic(metric.Sum().IsMonotonic())
m.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityCumulative)
ip.CopyTo(m.Sum().DataPoints().AppendEmpty())
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, resourceAttrs: resourceAttrs, scope: il, updated: now})
n++
}
return
}

func (a *lastValueAccumulator) accumulateDoubleHistogram(metric pmetric.Metric, il pcommon.InstrumentationScope, resourceAttrs pcommon.Map, now time.Time) (n int) {
doubleHistogram := metric.Histogram()

// Drop metrics with non-cumulative aggregations
if doubleHistogram.AggregationTemporality() != pmetric.MetricAggregationTemporalityCumulative {
return
}

dps := doubleHistogram.DataPoints()
for i := 0; i < dps.Len(); i++ {
ip := dps.At(i)

signature := timeseriesSignature(il.Name(), metric, ip.Attributes())
if ip.Flags().HasFlag(pmetric.MetricDataPointFlagNoRecordedValue) {
a.registeredMetrics.Delete(signature)
return 0
}

v, ok := a.registeredMetrics.Load(signature)
if !ok {
m := createMetric(metric)
ip.CopyTo(m.Histogram().DataPoints().AppendEmpty())
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, resourceAttrs: resourceAttrs, scope: il, updated: now})
n++
continue
}
mv := v.(*accumulatedValue)

if ip.Timestamp().AsTime().Before(mv.value.Histogram().DataPoints().At(0).Timestamp().AsTime()) {
// only keep datapoint with latest timestamp
continue
}

m := createMetric(metric)
ip.CopyTo(m.Histogram().DataPoints().AppendEmpty())
m.Histogram().SetAggregationTemporality(pmetric.MetricAggregationTemporalityCumulative)
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, resourceAttrs: resourceAttrs, scope: il, updated: now})
n++
}
return
}

// Collect returns a slice with relevant aggregated metrics and their resource attributes.
func (a *lastValueAccumulator) Collect() ([]pmetric.Metric, []pcommon.Map) {
a.logger.Debug("Accumulator collect called")

var metrics []pmetric.Metric
var resourceAttrs []pcommon.Map
expirationTime := time.Now().Add(-a.metricExpiration)

a.registeredMetrics.Range(func(key, value interface{}) bool {
v := value.(*accumulatedValue)
if expirationTime.After(v.updated) {
a.logger.Debug(fmt.Sprintf("metric expired: %s", v.value.Name()))
a.registeredMetrics.Delete(key)
return true
}

metrics = append(metrics, v.value)
resourceAttrs = append(resourceAttrs, v.resourceAttrs)
return true
})

return metrics, resourceAttrs
}

func timeseriesSignature(ilmName string, metric pmetric.Metric, attributes pcommon.Map) string {
var b strings.Builder
b.WriteString(metric.DataType().String())
b.WriteString("*" + ilmName)
b.WriteString("*" + metric.Name())
attributes.Sort().Range(func(k string, v pcommon.Value) bool {
b.WriteString("*" + k + "*" + v.AsString())
return true
})
return b.String()
}

func createMetric(metric pmetric.Metric) pmetric.Metric {
m := pmetric.NewMetric()
m.SetName(metric.Name())
m.SetDescription(metric.Description())
m.SetUnit(metric.Unit())
m.SetDataType(metric.DataType())

return m
}
Loading

0 comments on commit 9696174

Please sign in to comment.