Skip to content

Commit

Permalink
Merge pull request #4 from axoflow/feat/bytesconnector
Browse files Browse the repository at this point in the history
Feat: add bytesconnector
  • Loading branch information
OverOrion authored Jan 29, 2025
2 parents 3db8976 + 15e8b4e commit 2abb3d3
Show file tree
Hide file tree
Showing 10 changed files with 771 additions and 0 deletions.
80 changes: 80 additions & 0 deletions connector/bytesconnector/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package bytesconnector // import import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/bytesconnector"

import (
"fmt"

"go.opentelemetry.io/collector/confmap"
)

// Default metrics are emitted
const (
defaultMetricNameLogs = "log.record.bytes"
defaultMetricDescLogs = "The size of log records observed (in bytes)."
)

// Config for the connector
type Config struct {
Logs map[string]MetricInfo `mapstructure:"logs"`
}

// MetricInfo for a data type
type MetricInfo struct {
Description string `mapstructure:"description"`
Attributes []AttributeConfig `mapstructure:"attributes"`
}

type AttributeConfig struct {
Key string `mapstructure:"key"`
DefaultValue any `mapstructure:"default_value"`
}

func (c *Config) Validate() error {
for name, info := range c.Logs {
if name == "" {
return fmt.Errorf("logs: metric name missing")
}
if err := info.validateAttributes(); err != nil {
return fmt.Errorf("logs attributes: metric %q: %w", name, err)
}
}
return nil
}

func (i *MetricInfo) validateAttributes() error {
for _, attr := range i.Attributes {
if attr.Key == "" {
return fmt.Errorf("attribute key missing")
}
}
return nil
}

var _ confmap.Unmarshaler = (*Config)(nil)

// Unmarshal with custom logic to set default values.
// This is necessary to ensure that default metrics are
// not configured if the user has specified any custom metrics.
func (c *Config) Unmarshal(componentParser *confmap.Conf) error {
if componentParser == nil {
// Nothing to do if there is no config given.
return nil
}
if err := componentParser.Unmarshal(c, confmap.WithIgnoreUnused()); err != nil {
return err
}
if !componentParser.IsSet("logs") {
c.Logs = defaultLogsConfig()
}
return nil
}

func defaultLogsConfig() map[string]MetricInfo {
return map[string]MetricInfo{
defaultMetricNameLogs: {
Description: defaultMetricDescLogs,
},
}
}
69 changes: 69 additions & 0 deletions connector/bytesconnector/connector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package bytesconnector // import import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/bytesconnector"

import (
"context"
"errors"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/connector/bytesconnector/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog"
)

// count can count spans, span event, metrics, data points, or log records
// and emit the counts onto a metrics pipeline.
type count struct {
metricsConsumer consumer.Metrics
component.StartFunc
component.ShutdownFunc

logsMetricDefs map[string]metricDef[ottllog.TransformContext]
}

func (c *count) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func (c *count) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
var multiError error
countMetrics := pmetric.NewMetrics()
logSizer := &plog.ProtoMarshaler{}
bytesSize := uint64(logSizer.LogsSize(ld))
countMetrics.ResourceMetrics().EnsureCapacity(ld.ResourceLogs().Len())
for i := 0; i < ld.ResourceLogs().Len(); i++ {
resourceLog := ld.ResourceLogs().At(i)
counter := newCounter[ottllog.TransformContext](c.logsMetricDefs)

for j := 0; j < resourceLog.ScopeLogs().Len(); j++ {
scopeLogs := resourceLog.ScopeLogs().At(j)

for k := 0; k < scopeLogs.LogRecords().Len(); k++ {
logRecord := scopeLogs.LogRecords().At(k)
multiError = errors.Join(multiError, counter.update(ctx, logRecord.Attributes(), bytesSize))
}
}

if len(counter.counts) == 0 {
continue // don't add an empty resource
}

countResource := countMetrics.ResourceMetrics().AppendEmpty()
resourceLog.Resource().Attributes().CopyTo(countResource.Resource().Attributes())

countResource.ScopeMetrics().EnsureCapacity(resourceLog.ScopeLogs().Len())
countScope := countResource.ScopeMetrics().AppendEmpty()
countScope.Scope().SetName(metadata.ScopeName)

counter.appendMetricsTo(countScope.Metrics())
}
if multiError != nil {
return multiError
}
return c.metricsConsumer.ConsumeMetrics(ctx, countMetrics)
}
119 changes: 119 additions & 0 deletions connector/bytesconnector/counter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package bytesconnector // import import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/bytesconnector"

import (
"context"
"errors"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil"
)

var noAttributes = [16]byte{}

func newCounter[K any](metricDefs map[string]metricDef[K]) *counter[K] {
return &counter[K]{
metricDefs: metricDefs,
counts: make(map[string]map[[16]byte]*attrCounter, len(metricDefs)),
timestamp: time.Now(),
}
}

type counter[K any] struct {
metricDefs map[string]metricDef[K]
counts map[string]map[[16]byte]*attrCounter
timestamp time.Time
}

type attrCounter struct {
attrs pcommon.Map
count uint64
}

func (c *counter[K]) update(ctx context.Context, attrs pcommon.Map, incrementValue uint64) error {
var multiError error
for name, md := range c.metricDefs {
countAttrs := pcommon.NewMap()
for _, attr := range md.attrs {
if attrVal, ok := attrs.Get(attr.Key); ok {
switch typeAttr := attrVal.Type(); typeAttr {
case pcommon.ValueTypeInt:
countAttrs.PutInt(attr.Key, attrVal.Int())
case pcommon.ValueTypeDouble:
countAttrs.PutDouble(attr.Key, attrVal.Double())
default:
countAttrs.PutStr(attr.Key, attrVal.Str())
}
} else if attr.DefaultValue != nil {
switch v := attr.DefaultValue.(type) {
case string:
if v != "" {
countAttrs.PutStr(attr.Key, v)
}
case int:
if v != 0 {
countAttrs.PutInt(attr.Key, int64(v))
}
case float64:
if v != 0 {
countAttrs.PutDouble(attr.Key, float64(v))
}
}
}
}

// Missing necessary attributes to be counted
if countAttrs.Len() != len(md.attrs) {
continue
}

multiError = errors.Join(multiError, c.increment(name, countAttrs, incrementValue))
continue
}
return multiError
}

func (c *counter[K]) increment(metricName string, attrs pcommon.Map, incrementValue uint64) error {
if _, ok := c.counts[metricName]; !ok {
c.counts[metricName] = make(map[[16]byte]*attrCounter)
}

key := noAttributes
if attrs.Len() > 0 {
key = pdatautil.MapHash(attrs)
}

if _, ok := c.counts[metricName][key]; !ok {
c.counts[metricName][key] = &attrCounter{attrs: attrs}
}

c.counts[metricName][key].count = incrementValue
return nil
}

func (c *counter[K]) appendMetricsTo(metricSlice pmetric.MetricSlice) {
for name, md := range c.metricDefs {
if len(c.counts[name]) == 0 {
continue
}
countMetric := metricSlice.AppendEmpty()
countMetric.SetName(name)
countMetric.SetDescription(md.desc)
sum := countMetric.SetEmptySum()
// The delta value is always positive, so a value accumulated downstream is monotonic
sum.SetIsMonotonic(true)
sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
for _, dpCount := range c.counts[name] {
dp := sum.DataPoints().AppendEmpty()
dpCount.attrs.CopyTo(dp.Attributes())
dp.SetIntValue(int64(dpCount.count))
// TODO determine appropriate start time
dp.SetTimestamp(pcommon.NewTimestampFromTime(c.timestamp))
}
}
}
60 changes: 60 additions & 0 deletions connector/bytesconnector/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:generate mdatagen metadata.yaml

package bytesconnector // import import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/bytesconnector"

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/consumer"

"github.com/open-telemetry/opentelemetry-collector-contrib/connector/bytesconnector/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog"
)

// NewFactory returns a ConnectorFactory.
func NewFactory() connector.Factory {
return connector.NewFactory(
metadata.Type,
createDefaultConfig,
connector.WithLogsToMetrics(createLogsToMetrics, metadata.LogsToMetricsStability),
)
}

// createDefaultConfig creates the default configuration.
func createDefaultConfig() component.Config {
return &Config{}
}

// createLogsToMetrics creates a logs to metrics connector based on provided config.
func createLogsToMetrics(
_ context.Context,
set connector.Settings,
cfg component.Config,
nextConsumer consumer.Metrics,
) (connector.Logs, error) {
c := cfg.(*Config)

metricDefs := make(map[string]metricDef[ottllog.TransformContext], len(c.Logs))
for name, info := range c.Logs {
md := metricDef[ottllog.TransformContext]{
desc: info.Description,
attrs: info.Attributes,
}
metricDefs[name] = md
}

return &count{
metricsConsumer: nextConsumer,
logsMetricDefs: metricDefs,
}, nil
}

type metricDef[K any] struct {
desc string
attrs []AttributeConfig
}
Loading

0 comments on commit 2abb3d3

Please sign in to comment.