Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: add bytesconnector #4

Merged
merged 1 commit into from
Jan 29, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions connector/bytesconnector/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package bytesconnector // import import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/bytesconnector"

import (
"fmt"

"go.opentelemetry.io/collector/confmap"
)

// Default metrics are emitted
const (
defaultMetricNameLogs = "log.record.bytes"
defaultMetricDescLogs = "The size of log records observed (in bytes)."
)

// Config for the connector
type Config struct {
Logs map[string]MetricInfo `mapstructure:"logs"`
}

// MetricInfo for a data type
type MetricInfo struct {
Description string `mapstructure:"description"`
Attributes []AttributeConfig `mapstructure:"attributes"`
}

type AttributeConfig struct {
Key string `mapstructure:"key"`
DefaultValue any `mapstructure:"default_value"`
}

func (c *Config) Validate() error {
for name, info := range c.Logs {
if name == "" {
return fmt.Errorf("logs: metric name missing")
}
if err := info.validateAttributes(); err != nil {
return fmt.Errorf("logs attributes: metric %q: %w", name, err)
}
}
return nil
}

func (i *MetricInfo) validateAttributes() error {
for _, attr := range i.Attributes {
if attr.Key == "" {
return fmt.Errorf("attribute key missing")
}
}
return nil
}

var _ confmap.Unmarshaler = (*Config)(nil)

// Unmarshal with custom logic to set default values.
// This is necessary to ensure that default metrics are
// not configured if the user has specified any custom metrics.
func (c *Config) Unmarshal(componentParser *confmap.Conf) error {
if componentParser == nil {
// Nothing to do if there is no config given.
return nil
}
if err := componentParser.Unmarshal(c, confmap.WithIgnoreUnused()); err != nil {
return err
}
if !componentParser.IsSet("logs") {
c.Logs = defaultLogsConfig()
}
return nil
}

func defaultLogsConfig() map[string]MetricInfo {
return map[string]MetricInfo{
defaultMetricNameLogs: {
Description: defaultMetricDescLogs,
},
}
}
69 changes: 69 additions & 0 deletions connector/bytesconnector/connector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package bytesconnector // import import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/bytesconnector"

import (
"context"
"errors"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/connector/bytesconnector/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog"
)

// count can count spans, span event, metrics, data points, or log records
// and emit the counts onto a metrics pipeline.
type count struct {
metricsConsumer consumer.Metrics
component.StartFunc
component.ShutdownFunc

logsMetricDefs map[string]metricDef[ottllog.TransformContext]
}

func (c *count) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func (c *count) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
var multiError error
countMetrics := pmetric.NewMetrics()
logSizer := &plog.ProtoMarshaler{}
bytesSize := uint64(logSizer.LogsSize(ld))
countMetrics.ResourceMetrics().EnsureCapacity(ld.ResourceLogs().Len())
for i := 0; i < ld.ResourceLogs().Len(); i++ {
resourceLog := ld.ResourceLogs().At(i)
counter := newCounter[ottllog.TransformContext](c.logsMetricDefs)

for j := 0; j < resourceLog.ScopeLogs().Len(); j++ {
scopeLogs := resourceLog.ScopeLogs().At(j)

for k := 0; k < scopeLogs.LogRecords().Len(); k++ {
logRecord := scopeLogs.LogRecords().At(k)
multiError = errors.Join(multiError, counter.update(ctx, logRecord.Attributes(), bytesSize))
}
}

if len(counter.counts) == 0 {
continue // don't add an empty resource
}

countResource := countMetrics.ResourceMetrics().AppendEmpty()
resourceLog.Resource().Attributes().CopyTo(countResource.Resource().Attributes())

countResource.ScopeMetrics().EnsureCapacity(resourceLog.ScopeLogs().Len())
countScope := countResource.ScopeMetrics().AppendEmpty()
countScope.Scope().SetName(metadata.ScopeName)

counter.appendMetricsTo(countScope.Metrics())
}
if multiError != nil {
return multiError
}
return c.metricsConsumer.ConsumeMetrics(ctx, countMetrics)
}
119 changes: 119 additions & 0 deletions connector/bytesconnector/counter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package bytesconnector // import import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/bytesconnector"

import (
"context"
"errors"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil"
)

var noAttributes = [16]byte{}

func newCounter[K any](metricDefs map[string]metricDef[K]) *counter[K] {
return &counter[K]{
metricDefs: metricDefs,
counts: make(map[string]map[[16]byte]*attrCounter, len(metricDefs)),
timestamp: time.Now(),
}
}

type counter[K any] struct {
metricDefs map[string]metricDef[K]
counts map[string]map[[16]byte]*attrCounter
timestamp time.Time
}

type attrCounter struct {
attrs pcommon.Map
count uint64
}

func (c *counter[K]) update(ctx context.Context, attrs pcommon.Map, incrementValue uint64) error {
var multiError error
for name, md := range c.metricDefs {
countAttrs := pcommon.NewMap()
for _, attr := range md.attrs {
if attrVal, ok := attrs.Get(attr.Key); ok {
switch typeAttr := attrVal.Type(); typeAttr {
case pcommon.ValueTypeInt:
countAttrs.PutInt(attr.Key, attrVal.Int())
case pcommon.ValueTypeDouble:
countAttrs.PutDouble(attr.Key, attrVal.Double())
default:
countAttrs.PutStr(attr.Key, attrVal.Str())
}
} else if attr.DefaultValue != nil {
switch v := attr.DefaultValue.(type) {
case string:
if v != "" {
countAttrs.PutStr(attr.Key, v)
}
case int:
if v != 0 {
countAttrs.PutInt(attr.Key, int64(v))
}
case float64:
if v != 0 {
countAttrs.PutDouble(attr.Key, float64(v))
}
}
}
}

// Missing necessary attributes to be counted
if countAttrs.Len() != len(md.attrs) {
continue
}

multiError = errors.Join(multiError, c.increment(name, countAttrs, incrementValue))
continue
}
return multiError
}

func (c *counter[K]) increment(metricName string, attrs pcommon.Map, incrementValue uint64) error {
if _, ok := c.counts[metricName]; !ok {
c.counts[metricName] = make(map[[16]byte]*attrCounter)
}

key := noAttributes
if attrs.Len() > 0 {
key = pdatautil.MapHash(attrs)
}

if _, ok := c.counts[metricName][key]; !ok {
c.counts[metricName][key] = &attrCounter{attrs: attrs}
}

c.counts[metricName][key].count = incrementValue
return nil
}

func (c *counter[K]) appendMetricsTo(metricSlice pmetric.MetricSlice) {
for name, md := range c.metricDefs {
if len(c.counts[name]) == 0 {
continue
}
countMetric := metricSlice.AppendEmpty()
countMetric.SetName(name)
countMetric.SetDescription(md.desc)
sum := countMetric.SetEmptySum()
// The delta value is always positive, so a value accumulated downstream is monotonic
sum.SetIsMonotonic(true)
sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
for _, dpCount := range c.counts[name] {
dp := sum.DataPoints().AppendEmpty()
dpCount.attrs.CopyTo(dp.Attributes())
dp.SetIntValue(int64(dpCount.count))
// TODO determine appropriate start time
dp.SetTimestamp(pcommon.NewTimestampFromTime(c.timestamp))
}
}
}
60 changes: 60 additions & 0 deletions connector/bytesconnector/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:generate mdatagen metadata.yaml

package bytesconnector // import import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/bytesconnector"

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/consumer"

"github.com/open-telemetry/opentelemetry-collector-contrib/connector/bytesconnector/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog"
)

// NewFactory returns a ConnectorFactory.
func NewFactory() connector.Factory {
return connector.NewFactory(
metadata.Type,
createDefaultConfig,
connector.WithLogsToMetrics(createLogsToMetrics, metadata.LogsToMetricsStability),
)
}

// createDefaultConfig creates the default configuration.
func createDefaultConfig() component.Config {
return &Config{}
}

// createLogsToMetrics creates a logs to metrics connector based on provided config.
func createLogsToMetrics(
_ context.Context,
set connector.Settings,
cfg component.Config,
nextConsumer consumer.Metrics,
) (connector.Logs, error) {
c := cfg.(*Config)

metricDefs := make(map[string]metricDef[ottllog.TransformContext], len(c.Logs))
for name, info := range c.Logs {
md := metricDef[ottllog.TransformContext]{
desc: info.Description,
attrs: info.Attributes,
}
metricDefs[name] = md
}

return &count{
metricsConsumer: nextConsumer,
logsMetricDefs: metricDefs,
}, nil
}

type metricDef[K any] struct {
desc string
attrs []AttributeConfig
}
Loading
Loading