Skip to content

Commit

Permalink
filter metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
dmathieu committed Aug 23, 2023
1 parent b5081b3 commit 1a424e4
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 3 deletions.
18 changes: 18 additions & 0 deletions internal/telemetry/metric_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func NewMetricExporter(p modelpb.BatchProcessor, opts ...ConfigOption) *MetricEx
return &MetricExporter{
processor: p,

metricFilter: cfg.MetricFilter,
temporalitySelector: cfg.TemporalitySelector,
aggregationSelector: cfg.AggregationSelector,
}
Expand All @@ -49,6 +50,7 @@ func NewMetricExporter(p modelpb.BatchProcessor, opts ...ConfigOption) *MetricEx
type MetricExporter struct {
processor modelpb.BatchProcessor

metricFilter []string
temporalitySelector metric.TemporalitySelector
aggregationSelector metric.AggregationSelector
}
Expand Down Expand Up @@ -84,6 +86,10 @@ func (e *MetricExporter) Export(ctx context.Context, rm *metricdata.ResourceMetr
for _, scopeMetrics := range rm.ScopeMetrics {
ms := make(metricsets)
for _, sm := range scopeMetrics.Metrics {
if e.isMetricFiltered(sm.Name) {
continue
}

if err := addMetric(sm, ms); err != nil {
return err
}
Expand Down Expand Up @@ -117,10 +123,22 @@ func (e *MetricExporter) Export(ctx context.Context, rm *metricdata.ResourceMetr
batch = append(batch, event)
}
}
if len(batch) == 0 {
return nil
}

return e.processor.ProcessBatch(ctx, &batch)
}

func (e *MetricExporter) isMetricFiltered(n string) bool {
for _, v := range e.metricFilter {
if v == n {
return false
}
}
return true
}

func addMetric(sm metricdata.Metrics, ms metricsets) error {
switch m := sm.Data.(type) {
case metricdata.Histogram[int64]:
Expand Down
12 changes: 12 additions & 0 deletions internal/telemetry/metric_exporter_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ var customHistogramBoundaries = []float64{
}

type Config struct {
MetricFilter []string
TemporalitySelector metric.TemporalitySelector
AggregationSelector metric.AggregationSelector
}
Expand Down Expand Up @@ -74,6 +75,17 @@ func defaultAggregationSelector(ik metric.InstrumentKind) aggregation.Aggregatio
}
}

// WithMetricFilter configured the metrics filter. Any metric filtered here
// will be the only ones to be exported. All other metrics will be ignored.
//
// Defaults to not exporting anything.
func WithMetricFilter(f []string) ConfigOption {
return func(cfg Config) Config {
cfg.MetricFilter = f
return cfg
}
}

// WithAggregationSelector configure the Aggregation Selector the exporter will
// use. If no AggregationSelector is provided the DefaultAggregationSelector is
// used.
Expand Down
34 changes: 31 additions & 3 deletions internal/telemetry/metric_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,24 @@ func TestMetricExporter(t *testing.T) {
for _, tt := range []struct {
name string

recordMetrics func(ctx context.Context, meter metric.Meter)
expectedBatch modelpb.Batch
exporterConfig []ConfigOption
recordMetrics func(ctx context.Context, meter metric.Meter)
expectedBatch modelpb.Batch
}{
{
name: "with a filtered metric",
recordMetrics: func(ctx context.Context, meter metric.Meter) {
counter, err := meter.Int64Counter("filtered_metric")
assert.NoError(t, err)
counter.Add(ctx, 1)
},
expectedBatch: modelpb.Batch(nil),
},
{
name: "with an int64 histogram",
exporterConfig: []ConfigOption{
WithMetricFilter([]string{"histogram_metric"}),
},
recordMetrics: func(ctx context.Context, meter metric.Meter) {
counter, err := meter.Int64Histogram("histogram_metric")
assert.NoError(t, err)
Expand Down Expand Up @@ -100,6 +113,9 @@ func TestMetricExporter(t *testing.T) {
},
{
name: "with a float64 histogram",
exporterConfig: []ConfigOption{
WithMetricFilter([]string{"histogram_metric"}),
},
recordMetrics: func(ctx context.Context, meter metric.Meter) {
counter, err := meter.Float64Histogram("histogram_metric")
assert.NoError(t, err)
Expand Down Expand Up @@ -183,6 +199,9 @@ func TestMetricExporter(t *testing.T) {
},
{
name: "with an int64 counter",
exporterConfig: []ConfigOption{
WithMetricFilter([]string{"sum_metric"}),
},
recordMetrics: func(ctx context.Context, meter metric.Meter) {
counter, err := meter.Int64Counter("sum_metric")
assert.NoError(t, err)
Expand All @@ -204,6 +223,9 @@ func TestMetricExporter(t *testing.T) {
},
{
name: "with a float64 counter",
exporterConfig: []ConfigOption{
WithMetricFilter([]string{"sum_metric"}),
},
recordMetrics: func(ctx context.Context, meter metric.Meter) {
counter, err := meter.Float64Counter("sum_metric")
assert.NoError(t, err)
Expand All @@ -225,6 +247,9 @@ func TestMetricExporter(t *testing.T) {
},
{
name: "with an int64 gauge",
exporterConfig: []ConfigOption{
WithMetricFilter([]string{"gauge_metric"}),
},
recordMetrics: func(ctx context.Context, meter metric.Meter) {
counter, err := meter.Int64UpDownCounter("gauge_metric")
assert.NoError(t, err)
Expand All @@ -246,6 +271,9 @@ func TestMetricExporter(t *testing.T) {
},
{
name: "with a float64 gauge",
exporterConfig: []ConfigOption{
WithMetricFilter([]string{"gauge_metric"}),
},
recordMetrics: func(ctx context.Context, meter metric.Meter) {
counter, err := meter.Float64UpDownCounter("gauge_metric")
assert.NoError(t, err)
Expand Down Expand Up @@ -274,7 +302,7 @@ func TestMetricExporter(t *testing.T) {
batch = append(batch, (*b)...)
return nil
})
e := NewMetricExporter(p)
e := NewMetricExporter(p, tt.exporterConfig...)

provider := sdkmetric.NewMeterProvider(
sdkmetric.WithReader(sdkmetric.NewPeriodicReader(e)),
Expand Down

0 comments on commit 1a424e4

Please sign in to comment.