Skip to content

Commit

Permalink
Add user fields mapping to otlp
Browse files Browse the repository at this point in the history
  • Loading branch information
ericywl committed Jan 24, 2025
1 parent 99f84c0 commit f86e95f
Show file tree
Hide file tree
Showing 8 changed files with 241 additions and 5 deletions.
5 changes: 5 additions & 0 deletions input/otlp/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,11 @@ func (c *Consumer) convertLogRecord(
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Namespace = sanitizeDataStreamNamespace(v.Str())

// user.*
case attributeUserID, attributeUserName, attributeUserEmail:
addUserFields(k, v, event)

default:
setLabel(replaceDots(k), event, v)
}
Expand Down
37 changes: 37 additions & 0 deletions input/otlp/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,43 @@ func processLogEvents(t *testing.T, logs plog.Logs) modelpb.Batch {
return processed
}

func TestConsumeLogsUserFields(t *testing.T) {
for _, tc := range userFieldsTestCases() {
t.Run(tc.name, func(t *testing.T) {
// Setup logs
logs := plog.NewLogs()
resourceLogs := logs.ResourceLogs().AppendEmpty()
scopeLogs := resourceLogs.ScopeLogs().AppendEmpty()
// Add process owner in resource metadata attributes
if tc.processOwner != "" {
resourceLogs.Resource().Attributes().PutStr(semconv.AttributeProcessOwner, tc.processOwner)
}
// Add attributes
logRecord := newLogRecord("foobar")
if tc.input.userID != "" {
logRecord.Attributes().PutStr("user.id", tc.input.userID)
}
if tc.input.userName != "" {
logRecord.Attributes().PutStr("user.name", tc.input.userName)
}
if tc.input.userEmail != "" {
logRecord.Attributes().PutStr("user.email", tc.input.userEmail)
}
// Copy log record
logRecord.CopyTo(scopeLogs.LogRecords().AppendEmpty())

processed := processLogEvents(t, logs)
expectedUser := &modelpb.User{
Id: tc.expected.userID,
Email: tc.expected.userEmail,
Name: tc.expected.userName,
}
assert.Len(t, processed, 1)
assert.EqualValues(t, expectedUser, processed[0].User)
})
}
}

func TestConsumerConsumeLogsDataStream(t *testing.T) {
randomString := strings.Repeat("abcdefghijklmnopqrstuvwxyz0123456789", 10)
maxLenNamespace := otlp.MaxDataStreamBytes - len(otlp.DisallowedNamespaceRunes)
Expand Down
16 changes: 16 additions & 0 deletions input/otlp/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,3 +580,19 @@ func replaceReservedRune(disallowedRunes string) func(r rune) rune {
return unicode.ToLower(r)
}
}

// NOTE: This will overwrite user fields set by `user.name=process.owner` from `otlp/metadata.go:translateResourceMetadata`.
func addUserFields(attrKey string, attrVal pcommon.Value, updateEvent *modelpb.APMEvent) {
if updateEvent.User == nil {
updateEvent.User = &modelpb.User{}
}

switch attrKey {
case attributeUserID:
updateEvent.User.Id = truncate(attrVal.Str())
case attributeUserName:
updateEvent.User.Name = truncate(attrVal.Str())
case attributeUserEmail:
updateEvent.User.Email = truncate(attrVal.Str())
}
}
73 changes: 73 additions & 0 deletions input/otlp/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package otlp_test

import (
"strings"
"testing"

"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -413,3 +414,75 @@ func transformResourceMetadata(t *testing.T, resourceAttrs map[string]interface{
(*events)[0].Timestamp = 0
return (*events)[0]
}

type userField struct {
userID string
userName string
userEmail string
}

type userFieldsTestCase struct {
name string
processOwner string
input userField
expected userField
}

func userFieldsTestCases() []userFieldsTestCase {
return []userFieldsTestCase{
{
name: "default",
input: userField{
userID: "123",
userName: "hello",
userEmail: "[email protected]",
},
expected: userField{
userID: "123",
userName: "hello",
userEmail: "[email protected]",
},
},
{
name: "process owner not overwritten",
processOwner: "i-am-the-owner",
input: userField{
userID: "123",
userName: "",
userEmail: "[email protected]",
},
expected: userField{
userID: "123",
userName: "i-am-the-owner",
userEmail: "[email protected]",
},
},
{
name: "overwrite process owner",
processOwner: "i-am-the-owner",
input: userField{
userID: "123",
userName: "hello",
userEmail: "[email protected]",
},
expected: userField{
userID: "123",
userName: "hello",
userEmail: "[email protected]",
},
},
{
name: "truncate",
input: userField{
userID: strings.Repeat("a", 2000),
userName: strings.Repeat("b", 2000),
userEmail: strings.Repeat("c", 2000),
},
expected: userField{
userID: strings.Repeat("a", 1024),
userName: strings.Repeat("b", 1024),
userEmail: strings.Repeat("c", 1024),
},
},
}
}
10 changes: 5 additions & 5 deletions input/otlp/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,11 @@ func (c *Consumer) handleScopeMetrics(
event.Event = &modelpb.Event{}
}
event.Event.Module = v.Str()
case "user.name":
if event.User == nil {
event.User = &modelpb.User{}
}
event.User.Name = truncate(v.Str())

// user.*
case attributeUserID, attributeUserName, attributeUserEmail:
addUserFields(k, v, event)

default:
setLabel(k, event, v)
}
Expand Down
56 changes: 56 additions & 0 deletions input/otlp/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
semconv "go.opentelemetry.io/collector/semconv/v1.5.0"
"golang.org/x/sync/semaphore"
"google.golang.org/protobuf/testing/protocmp"

Expand Down Expand Up @@ -736,6 +737,61 @@ func TestConsumeMetricsExportTimestamp(t *testing.T) {
}
}

func TestConsumeMetricsUserFields(t *testing.T) {
for _, tc := range userFieldsTestCases() {
t.Run(tc.name, func(t *testing.T) {
// Setup metric
metrics := pmetric.NewMetrics()
resourceMetrics := metrics.ResourceMetrics().AppendEmpty()
scopeMetrics := resourceMetrics.ScopeMetrics().AppendEmpty()
metric := scopeMetrics.Metrics().AppendEmpty()
metric.SetName("test_sum")
// Add data point
timestamp := time.Unix(123456789, 0).UTC()
dp := metric.SetEmptySum().DataPoints().AppendEmpty()
dp.SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
dp.SetIntValue(42)
// Add process owner in resource metadata attributes
if tc.processOwner != "" {
resourceMetrics.Resource().Attributes().PutStr(semconv.AttributeProcessOwner, tc.processOwner)
}
// Add attributes
if tc.input.userID != "" {
dp.Attributes().PutStr("user.id", tc.input.userID)
}
if tc.input.userName != "" {
dp.Attributes().PutStr("user.name", tc.input.userName)
}
if tc.input.userEmail != "" {
dp.Attributes().PutStr("user.email", tc.input.userEmail)
}

events, _, result, err := transformMetrics(t, metrics)
assert.NoError(t, err)
assert.Equal(t, otlp.ConsumeMetricsResult{}, result)

expected := []*modelpb.APMEvent{{
Agent: &modelpb.Agent{Name: "otlp", Version: "unknown"},
Service: &modelpb.Service{Name: "unknown", Language: &modelpb.Language{Name: "unknown"}},
Timestamp: modelpb.FromTime(timestamp),
Metricset: &modelpb.Metricset{
Name: "app",
Samples: []*modelpb.MetricsetSample{
{Name: "test_sum", Value: 42, Type: modelpb.MetricType_METRIC_TYPE_COUNTER},
},
},
User: &modelpb.User{
Id: tc.expected.userID,
Name: tc.expected.userName,
Email: tc.expected.userEmail,
},
}}

eventsMatch(t, expected, events)
})
}
}

func TestConsumeMetricsDataStream(t *testing.T) {
randomString := strings.Repeat("abcdefghijklmnopqrstuvwxyz0123456789", 10)
maxLenNamespace := otlp.MaxDataStreamBytes - len(otlp.DisallowedNamespaceRunes)
Expand Down
10 changes: 10 additions & 0 deletions input/otlp/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ const (
attributeDataStreamDataset = "data_stream.dataset"
attributeDataStreamNamespace = "data_stream.namespace"
attributeGenAiSystem = "gen_ai.system"
// User fields, refer to https://github.com/elastic/apm-server/issues/15254.
// There are other user fields in OTel, but they don't have 1-to-1 mapping to `APMEvent.User`.
attributeUserName = "user.name"
attributeUserID = "user.id"
attributeUserEmail = "user.email"
)

// ConsumeTracesResult contains the number of rejected spans and error message for partial success response.
Expand Down Expand Up @@ -1170,6 +1175,11 @@ func (c *Consumer) convertSpanEvent(
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Namespace = sanitizeDataStreamNamespace(v.Str())

// user.*
case attributeUserID, attributeUserName, attributeUserEmail:
addUserFields(k, v, event)

default:
k = replaceDots(k)
if isJaeger && k == "message" {
Expand Down
39 changes: 39 additions & 0 deletions input/otlp/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1401,6 +1401,45 @@ func TestSpanCodeStacktrace(t *testing.T) {
})
}

func TestConsumeTracesUserFields(t *testing.T) {
for _, tc := range userFieldsTestCases() {
t.Run(tc.name, func(t *testing.T) {
// Setup span
traces, spans := newTracesSpans()
otelSpan := spans.Spans().AppendEmpty()
otelSpan.SetTraceID(pcommon.TraceID{111})
otelSpan.SetSpanID(pcommon.SpanID{222})
// Add process owner in resource metadata attributes
if tc.processOwner != "" {
traces.ResourceSpans().At(0).Resource().Attributes().PutStr(semconv.AttributeProcessOwner, tc.processOwner)
}
// Add attributes
timestamp := time.Unix(123456789, 0).UTC()
spanEvent := otelSpan.Events().AppendEmpty()
spanEvent.SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
if tc.input.userID != "" {
spanEvent.Attributes().PutStr("user.id", tc.input.userID)
}
if tc.input.userName != "" {
spanEvent.Attributes().PutStr("user.name", tc.input.userName)
}
if tc.input.userEmail != "" {
spanEvent.Attributes().PutStr("user.email", tc.input.userEmail)
}

allEvents := transformTraces(t, traces)
events := (*allEvents)[1:]
expectedUser := &modelpb.User{
Id: tc.expected.userID,
Email: tc.expected.userEmail,
Name: tc.expected.userName,
}
assert.Len(t, events, 1)
assert.EqualValues(t, expectedUser, events[0].User)
})
}
}

func TestSpanEventsDataStream(t *testing.T) {
for _, isException := range []bool{false, true} {
t.Run(fmt.Sprintf("isException=%v", isException), func(t *testing.T) {
Expand Down

0 comments on commit f86e95f

Please sign in to comment.