Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 6 additions & 26 deletions ddtrace/tracer/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,15 @@ func newPayload(protocol float64) payload {

// https://github.com/msgpack/msgpack/blob/master/spec.md#array-format-family
const (
// arrays
msgpackArrayFix byte = 144 // up to 15 items
msgpackArray16 byte = 0xdc // up to 2^16-1 items, followed by size in 2 bytes
msgpackArray32 byte = 0xdd // up to 2^32-1 items, followed by size in 4 bytes

// maps
msgpackMapFix byte = 0x80 // up to 15 items
msgpackMap16 byte = 0xde // up to 2^16-1 items, followed by size in 2 bytes
msgpackMap32 byte = 0xdf // up to 2^32-1 items, followed by size in 4 bytes
)

// safePayload provides a thread-safe wrapper around payload.
Expand Down Expand Up @@ -152,29 +158,3 @@ func (sp *safePayload) protocol() float64 {
// Protocol is immutable after creation - no lock needed
return sp.p.protocol()
}

// traceChunk represents a list of spans with the same trace ID,
// i.e. a chunk of a trace
type traceChunk struct {
// the sampling priority of the trace
priority int32 `msg:"priority"`

// the optional string origin ("lambda", "rum", etc.) of the trace chunk
origin string `msg:"origin,omitempty"`

// a collection of key to value pairs common in all `spans`
attributes keyValueList `msg:"attributes,omitempty"`

// a list of spans in this chunk
spans spanList `msg:"spans,omitempty"`

// whether the trace only contains analyzed spans
// (not required by tracers and set by the agent)
droppedTrace bool `msg:"droppedTrace"`

// the ID of the trace to which all spans in this chunk belong
traceID []byte `msg:"traceID"`

// the optional string decision maker (previously span tag _dd.p.dm)
samplingMechanism string `msg:"samplingMechanism,omitempty"`
}
81 changes: 69 additions & 12 deletions ddtrace/tracer/payload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ import (
"bytes"
"fmt"
"io"
"math"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"

"github.com/DataDog/dd-trace-go/v2/internal/globalconfig"
"github.com/DataDog/dd-trace-go/v2/internal/version"
"github.com/stretchr/testify/assert"
"github.com/tinylib/msgp/msgp"
)
Expand Down Expand Up @@ -82,17 +85,18 @@ func TestPayloadV04Decode(t *testing.T) {
func TestPayloadV1Decode(t *testing.T) {
for _, n := range []int{10, 1 << 10} {
t.Run(strconv.Itoa(n), func(t *testing.T) {
assert := assert.New(t)
p := newPayloadV1()

p.containerID = "containerID"
p.languageName = "languageName"
p.languageVersion = "languageVersion"
p.tracerVersion = "tracerVersion"
p.runtimeID = "runtimeID"
p.env = "env"
p.hostname = "hostname"
p.appVersion = "appVersion"
var (
assert = assert.New(t)
p = newPayloadV1()
)
p.SetContainerID("containerID")
p.SetLanguageName("go")
p.SetLanguageVersion("1.25")
p.SetTracerVersion(version.Tag)
p.SetRuntimeID(globalconfig.RuntimeID())
p.SetEnv("test")
p.SetHostname("hostname")
p.SetAppVersion("appVersion")

for i := 0; i < n; i++ {
_, _ = p.push(newSpanList(i%5 + 1))
Expand All @@ -102,9 +106,14 @@ func TestPayloadV1Decode(t *testing.T) {
assert.NoError(err)

got := newPayloadV1()
_, err = got.Decode(encoded)
buf := bytes.NewBuffer(encoded)
_, err = buf.WriteTo(got)
assert.NoError(err)

o, err := got.hydrate()
assert.NoError(err)
assert.Empty(o)
assert.Equal(p.fields, got.fields)
assert.Equal(p.containerID, got.containerID)
assert.Equal(p.languageName, got.languageName)
assert.Equal(p.languageVersion, got.languageVersion)
Expand All @@ -113,6 +122,54 @@ func TestPayloadV1Decode(t *testing.T) {
assert.Equal(p.env, got.env)
assert.Equal(p.hostname, got.hostname)
assert.Equal(p.appVersion, got.appVersion)
assert.Equal(p.fields, got.fields)
})
}
}

func TestPayloadV1EmbeddedStreamingStringTable(t *testing.T) {
p := newPayloadV1()
p.SetHostname("production")
p.SetEnv("production")
p.SetLanguageName("go")

assert := assert.New(t)
encoded, err := io.ReadAll(p)
assert.NoError(err)

got := newPayloadV1()
buf := bytes.NewBuffer(encoded)
_, err = buf.WriteTo(got)
assert.NoError(err)

o, err := got.hydrate()
assert.NoError(err)
assert.Empty(o)
assert.Equal(p.languageName, got.languageName)
assert.Equal(p.hostname, got.hostname)
assert.Equal(p.env, got.env)
}

func TestPayloadV1UpdateHeader(t *testing.T) {
testCases := []uint32{ // Number of items
15,
math.MaxUint16,
math.MaxUint32,
}
for _, tc := range testCases {
t.Run(fmt.Sprintf("n=%d", tc), func(t *testing.T) {
var (
p = payloadV1{
fields: tc,
header: make([]byte, 8),
}
expected []byte
)
expected = msgp.AppendMapHeader(expected, tc)
p.updateHeader()
if got := p.header[p.off:]; !bytes.Equal(expected, got) {
t.Fatalf("expected %+v, got %+v", expected, got)
}
})
}
}
Expand Down
Loading
Loading