Skip to content

Commit

Permalink
fix: cure span logs by cutting them by half every time we make a pass. (
Browse files Browse the repository at this point in the history
#87)

* fix: cure span logs by cutting them by half every time we make a pass.

* compute minSpanLogsArrSize constant
  • Loading branch information
tim-mwangi authored Sep 16, 2022
1 parent 8fcea70 commit c877b2e
Show file tree
Hide file tree
Showing 2 changed files with 204 additions and 4 deletions.
72 changes: 70 additions & 2 deletions exporter/kafkaexporter/jaeger_marshaler_curer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/hex"
"fmt"
"log"
"math"
"strconv"
"strings"

Expand All @@ -25,9 +26,13 @@ const (
maxTruncationTries = 5 // maximum number of times to attempt to truncate tag values.
// suffix used for new attributes created for those whose values have been truncated
// while curing the spans
truncationTagSuffix = ".htcollector.truncated"
truncationTagSuffix = ".htcollector.truncated"
spanLogsTruncationTagName = "htcollector.spanlogstruncated"
)

// We will attempt to truncate span logs only if the number is greater than 2 ^ maxTruncationTries
var minSpanLogsArrSize = int(math.Pow(float64(2), float64(maxTruncationTries)))

type jaegerMarshalerCurer struct {
marshaler jaegerSpanMarshaler
version sarama.KafkaVersion
Expand Down Expand Up @@ -127,7 +132,9 @@ func (j jaegerMarshalerCurer) spanAsString(span *jaegerproto.Span) string {
}
sb.WriteString("},")
}
sb.WriteString("]")
sb.WriteString("],")
sb.WriteString(fmt.Sprintf("logs count: %d", len(span.Logs)))
sb.WriteString("}")

return sb.String()
}
Expand Down Expand Up @@ -199,9 +206,56 @@ func (j jaegerMarshalerCurer) cureSpan(span *jaegerproto.Span, topic string) (*s
attributeValueSize = attributeValueSize / 2
}

// truncating span attributes did not work. try truncating span logs if they are available.
// attempt to truncate only if the number of span logs is greater than 2 ^ maxTruncationTries
if len(span.Logs) >= minSpanLogsArrSize {
return j.cureSpanLogs(span, topic)
}

return nil, fmt.Errorf("unable to cure span in %d truncation tries", maxTruncationTries)
}

// if log events are causing the span to be over 1MiB, then this is because there's a
// whole bunch of them and most probably they are repeated. I don't think logs going over 1MiB
// could be caused by the size of the log messages themselves.
// we cut the log events by half every time we make a pass.
func (j jaegerMarshalerCurer) cureSpanLogs(span *jaegerproto.Span, topic string) (*sarama.ProducerMessage, error) {
var appendedTruncationTag bool
for truncationTry := 0; truncationTry < maxTruncationTries; truncationTry++ {
span.Logs = cutSpanLogsByHalf(span.Logs)

// append the "htcollector.spanlogstruncated" attribute to the span tags if it's not been added.
if !appendedTruncationTag {
span.Tags = append(span.Tags, jaegerproto.KeyValue{
Key: spanLogsTruncationTagName,
VType: jaegerproto.ValueType_BOOL,
VBool: true,
})
appendedTruncationTag = true
}

bts, err := j.marshaler.marshal(span)
// return err if there is a problem marshaling
if err != nil {
return nil, err
}
key := []byte(span.TraceID.String())
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(bts),
Key: sarama.ByteEncoder(key),
}

// Check if the size is less than the max and if it is return. Otherwise do another pass and cut the logs by half
messageSize := byteSize(msg, j.version)
if messageSize <= j.maxMessageBytes {
return msg, nil
}
}

return nil, fmt.Errorf("unable to cure span logs in %d truncation tries", maxTruncationTries)
}

func valueToString(kv jaegerproto.KeyValue) string {
if kv.VType == jaegerproto.ValueType_STRING {
return kv.GetVStr()
Expand Down Expand Up @@ -255,3 +309,17 @@ func byteSize(m *sarama.ProducerMessage, v sarama.KafkaVersion) int {
}
return size
}

// cutSpanLogsByHalf returns the spanLogs with an even-numbered index in the span
// logs array effectively returning half of the span logs in the array.
func cutSpanLogsByHalf(origArr []jaegerproto.Log) []jaegerproto.Log {
newSize := len(origArr) / 2
if len(origArr)%2 != 0 {
newSize = newSize + 1
}
truncatedSpanLogs := make([]jaegerproto.Log, newSize)
for i := 0; i < newSize; i++ {
truncatedSpanLogs[i] = origArr[i*2]
}
return truncatedSpanLogs
}
136 changes: 134 additions & 2 deletions exporter/kafkaexporter/jaeger_marshaler_curer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"strings"
"testing"
"time"

"github.com/Shopify/sarama"
"github.com/gogo/protobuf/jsonpb"
Expand All @@ -21,6 +22,7 @@ func TestJaegerMarshalerCurer(t *testing.T) {
maxMessageBytes := 1024
maxAttributeValueSize := 256
jsonMarshaler := &jsonpb.Marshaler{}
ts := pcommon.NewTimestampFromTime(time.Now())

td := ptrace.NewTraces()
rs := td.ResourceSpans().AppendEmpty()
Expand Down Expand Up @@ -59,6 +61,38 @@ func TestJaegerMarshalerCurer(t *testing.T) {
}
span.Attributes().Insert("big-tag", pcommon.NewValueString(createLongString(maxMessageBytes, "a")))

// Will cure this span by curing the span logs
span = ils.Spans().AppendEmpty()
span.SetName("bar")
span.SetStartTimestamp(pcommon.Timestamp(101))
span.SetEndTimestamp(pcommon.Timestamp(226))
span.SetTraceID(pcommon.NewTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}))
span.SetSpanID(pcommon.NewSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}))
span.Attributes().Insert("tag10", pcommon.NewValueString("tag10-val"))
span.Attributes().Insert("tag11", pcommon.NewValueString("tag11-val"))
// Add events to span
for i := 0; i < 128; i++ {
se := span.Events().AppendEmpty()
se.SetName(createLongString(1, "a"))
se.SetTimestamp(ts)
}

// Will be unable to cure this span
span = ils.Spans().AppendEmpty()
span.SetName("bar")
span.SetStartTimestamp(pcommon.Timestamp(102))
span.SetEndTimestamp(pcommon.Timestamp(227))
span.SetTraceID(pcommon.NewTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}))
span.SetSpanID(pcommon.NewSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}))
span.Attributes().Insert("tag10", pcommon.NewValueString("tag10-val"))
span.Attributes().Insert("tag11", pcommon.NewValueString("tag11-val"))
// Add events to span
for i := 0; i < 1024; i++ {
se := span.Events().AppendEmpty()
se.SetName(createLongString(1, "a"))
se.SetTimestamp(ts)
}

batches, err := jaeger.ProtoFromTraces(td)
require.NoError(t, err)

Expand All @@ -71,7 +105,7 @@ func TestJaegerMarshalerCurer(t *testing.T) {
jsonByteBuffer0 := new(bytes.Buffer)
require.NoError(t, jsonMarshaler.Marshal(jsonByteBuffer0, batches[0].Spans[0]))

// Get the marshalled bytes of the 2nd span that cannot be cured. Will be the needed for expected value of the tests
// Get the marshalled bytes of the 3rd span that cannot be cured. Will be the needed for expected value of the tests
// depending on whether dropSpans is turned on or not.
batches[0].Spans[2].Process = batches[0].Process
jaegerProtoBytes2, err := batches[0].Spans[2].Marshal()
Expand All @@ -81,8 +115,19 @@ func TestJaegerMarshalerCurer(t *testing.T) {
jsonByteBuffer2 := new(bytes.Buffer)
require.NoError(t, jsonMarshaler.Marshal(jsonByteBuffer2, batches[0].Spans[2]))

// Get the marshalled bytes of the 5th span that cannot be cured. Will be the needed for expected value of the tests
// depending on whether dropSpans is turned on or not.
batches[0].Spans[4].Process = batches[0].Process
jaegerProtoBytes4, err := batches[0].Spans[4].Marshal()
require.NoError(t, err)
require.NotNil(t, jaegerProtoBytes4)

jsonByteBuffer4 := new(bytes.Buffer)
require.NoError(t, jsonMarshaler.Marshal(jsonByteBuffer4, batches[0].Spans[4]))

// expected cured spans should be similar to spans that came in as if they were already cured.
// batches[0].Spans[1] when cured will be the same as curedBatches[0].Spans[0]
// batches[0].Spans[1] when cured will be the same as curedBatches[0].Spans[0] except for the
// cured attribute.
curedTd := ptrace.NewTraces()
curedRs := curedTd.ResourceSpans().AppendEmpty()
curedRs.Resource().Attributes().Insert("test-key", pcommon.NewValueString("test-val"))
Expand All @@ -98,6 +143,42 @@ func TestJaegerMarshalerCurer(t *testing.T) {
curedSpan.Attributes().Insert("big-tag", pcommon.NewValueString(createLongString(maxAttributeValueSize, "a")))
curedSpan.Attributes().Insert("big-tag"+truncationTagSuffix, pcommon.NewValueBool(true))

// batches[0].Spans[3] when cured will be the same as curedBatches[0].Spans[1] except for the truncated log events.
curedSpan = curedIls.Spans().AppendEmpty()
curedSpan.SetName("bar")
curedSpan.SetStartTimestamp(pcommon.Timestamp(101))
curedSpan.SetEndTimestamp(pcommon.Timestamp(226))
curedSpan.SetTraceID(pcommon.NewTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}))
curedSpan.SetSpanID(pcommon.NewSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}))
curedSpan.Attributes().Insert("tag10", pcommon.NewValueString("tag10-val"))
curedSpan.Attributes().Insert("tag11", pcommon.NewValueString("tag11-val"))
curedSpan.Attributes().Insert(spanLogsTruncationTagName, pcommon.NewValueBool(true))
// Add events to span
for i := 0; i < 16; i++ {
se := curedSpan.Events().AppendEmpty()
se.SetName(createLongString(1, "a"))
se.SetTimestamp(ts)
}

// For the jaegerJSONSpanMarshaler{ pbMarshaler: &jsonpb.Marshaler{}}, the marshaled log events
// are large even for small log messages. So still the same as batches[0].Spans[3] except for the
// truncated log events.
curedSpan = curedIls.Spans().AppendEmpty()
curedSpan.SetName("bar")
curedSpan.SetStartTimestamp(pcommon.Timestamp(101))
curedSpan.SetEndTimestamp(pcommon.Timestamp(226))
curedSpan.SetTraceID(pcommon.NewTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}))
curedSpan.SetSpanID(pcommon.NewSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}))
curedSpan.Attributes().Insert("tag10", pcommon.NewValueString("tag10-val"))
curedSpan.Attributes().Insert("tag11", pcommon.NewValueString("tag11-val"))
curedSpan.Attributes().Insert(spanLogsTruncationTagName, pcommon.NewValueBool(true))
// Add events to span
for i := 0; i < 4; i++ {
se := curedSpan.Events().AppendEmpty()
se.SetName(createLongString(1, "a"))
se.SetTimestamp(ts)
}

curedBatches, err := jaeger.ProtoFromTraces(curedTd)
require.NoError(t, err)

Expand All @@ -109,6 +190,22 @@ func TestJaegerMarshalerCurer(t *testing.T) {
curedJsonByteBuffer1 := new(bytes.Buffer)
require.NoError(t, jsonMarshaler.Marshal(curedJsonByteBuffer1, curedBatches[0].Spans[0]))

curedBatches[0].Spans[1].Process = curedBatches[0].Process
curedJaegerProtoBytes2, err := curedBatches[0].Spans[1].Marshal()
require.NoError(t, err)
require.NotNil(t, curedJaegerProtoBytes2)

curedJsonByteBuffer2 := new(bytes.Buffer)
require.NoError(t, jsonMarshaler.Marshal(curedJsonByteBuffer2, curedBatches[0].Spans[1]))

curedBatches[0].Spans[2].Process = curedBatches[0].Process
curedJaegerProtoBytes3, err := curedBatches[0].Spans[2].Marshal()
require.NoError(t, err)
require.NotNil(t, curedJaegerProtoBytes3)

curedJsonByteBuffer3 := new(bytes.Buffer)
require.NoError(t, jsonMarshaler.Marshal(curedJsonByteBuffer3, curedBatches[0].Spans[2]))

tests := []struct {
unmarshaler TracesMarshaler
encoding string
Expand All @@ -126,6 +223,8 @@ func TestJaegerMarshalerCurer(t *testing.T) {
{Topic: "topic", Value: sarama.ByteEncoder(jaegerProtoBytes0), Key: sarama.ByteEncoder(messageKey)},
{Topic: "topic", Value: sarama.ByteEncoder(curedJaegerProtoBytes1), Key: sarama.ByteEncoder(messageKey)},
{Topic: "topic", Value: sarama.ByteEncoder(jaegerProtoBytes2), Key: sarama.ByteEncoder(messageKey)},
{Topic: "topic", Value: sarama.ByteEncoder(curedJaegerProtoBytes2), Key: sarama.ByteEncoder(messageKey)},
{Topic: "topic", Value: sarama.ByteEncoder(jaegerProtoBytes4), Key: sarama.ByteEncoder(messageKey)},
},
},
{
Expand All @@ -142,6 +241,8 @@ func TestJaegerMarshalerCurer(t *testing.T) {
{Topic: "topic", Value: sarama.ByteEncoder(jsonByteBuffer0.Bytes()), Key: sarama.ByteEncoder(messageKey)},
{Topic: "topic", Value: sarama.ByteEncoder(curedJsonByteBuffer1.Bytes()), Key: sarama.ByteEncoder(messageKey)},
{Topic: "topic", Value: sarama.ByteEncoder(jsonByteBuffer2.Bytes()), Key: sarama.ByteEncoder(messageKey)},
{Topic: "topic", Value: sarama.ByteEncoder(curedJsonByteBuffer3.Bytes()), Key: sarama.ByteEncoder(messageKey)},
{Topic: "topic", Value: sarama.ByteEncoder(jsonByteBuffer4.Bytes()), Key: sarama.ByteEncoder(messageKey)},
},
},
{
Expand All @@ -157,6 +258,8 @@ func TestJaegerMarshalerCurer(t *testing.T) {
{Topic: "topic", Value: sarama.ByteEncoder(jaegerProtoBytes0), Key: sarama.ByteEncoder(messageKey)},
{Topic: "topic", Value: sarama.ByteEncoder(curedJaegerProtoBytes1), Key: sarama.ByteEncoder(messageKey)},
{Topic: "topic", Value: sarama.ByteEncoder(jaegerProtoBytes2), Key: sarama.ByteEncoder(messageKey)},
{Topic: "topic", Value: sarama.ByteEncoder(curedJaegerProtoBytes2), Key: sarama.ByteEncoder(messageKey)},
{Topic: "topic", Value: sarama.ByteEncoder(jaegerProtoBytes4), Key: sarama.ByteEncoder(messageKey)},
},
},
{
Expand All @@ -171,6 +274,7 @@ func TestJaegerMarshalerCurer(t *testing.T) {
messages: []*sarama.ProducerMessage{
{Topic: "topic", Value: sarama.ByteEncoder(jaegerProtoBytes0), Key: sarama.ByteEncoder(messageKey)},
{Topic: "topic", Value: sarama.ByteEncoder(curedJaegerProtoBytes1), Key: sarama.ByteEncoder(messageKey)},
{Topic: "topic", Value: sarama.ByteEncoder(curedJaegerProtoBytes2), Key: sarama.ByteEncoder(messageKey)},
},
},
}
Expand Down Expand Up @@ -351,6 +455,34 @@ func TestJaegerMarshalerCurerCureSpansFail(t *testing.T) {
assert.Nil(t, msg)
}

func TestCutSpanLogsByHalf(t *testing.T) {
now := time.Now()
jpl1 := jaegerproto.Log{Timestamp: now}
jpl2 := jaegerproto.Log{Timestamp: now.Add(time.Minute * 2)}
jpl3 := jaegerproto.Log{Timestamp: now.Add(time.Minute * 4)}
jpl4 := jaegerproto.Log{Timestamp: now.Add(time.Minute * 6)}
jpl5 := jaegerproto.Log{Timestamp: now.Add(time.Minute * 8)}
jpl6 := jaegerproto.Log{Timestamp: now.Add(time.Minute * 10)}
assert.Equal(t,
[]jaegerproto.Log{jpl1},
cutSpanLogsByHalf([]jaegerproto.Log{jpl1}))
assert.Equal(t,
[]jaegerproto.Log{jpl1},
cutSpanLogsByHalf([]jaegerproto.Log{jpl1, jpl2}))
assert.Equal(t,
[]jaegerproto.Log{jpl1, jpl3},
cutSpanLogsByHalf([]jaegerproto.Log{jpl1, jpl2, jpl3}))
assert.Equal(t,
[]jaegerproto.Log{jpl1, jpl3},
cutSpanLogsByHalf([]jaegerproto.Log{jpl1, jpl2, jpl3, jpl4}))
assert.Equal(t,
[]jaegerproto.Log{jpl1, jpl3, jpl5},
cutSpanLogsByHalf([]jaegerproto.Log{jpl1, jpl2, jpl3, jpl4, jpl5}))
assert.Equal(t,
[]jaegerproto.Log{jpl1, jpl3, jpl5},
cutSpanLogsByHalf([]jaegerproto.Log{jpl1, jpl2, jpl3, jpl4, jpl5, jpl6}))
}

func createLongString(n int, s string) string {
var b strings.Builder
b.Grow(n * len(s))
Expand Down

0 comments on commit c877b2e

Please sign in to comment.