diff --git a/ddtrace/tracer/payload.go b/ddtrace/tracer/payload.go index 70be2dd865..8d8d04979e 100644 --- a/ddtrace/tracer/payload.go +++ b/ddtrace/tracer/payload.go @@ -60,9 +60,15 @@ func newPayload(protocol float64) payload { // https://github.com/msgpack/msgpack/blob/master/spec.md#array-format-family const ( + // arrays msgpackArrayFix byte = 144 // up to 15 items msgpackArray16 byte = 0xdc // up to 2^16-1 items, followed by size in 2 bytes msgpackArray32 byte = 0xdd // up to 2^32-1 items, followed by size in 4 bytes + + // maps + msgpackMapFix byte = 0x80 // up to 15 items + msgpackMap16 byte = 0xde // up to 2^16-1 items, followed by size in 2 bytes + msgpackMap32 byte = 0xdf // up to 2^32-1 items, followed by size in 4 bytes ) // safePayload provides a thread-safe wrapper around payload. @@ -152,29 +158,3 @@ func (sp *safePayload) protocol() float64 { // Protocol is immutable after creation - no lock needed return sp.p.protocol() } - -// traceChunk represents a list of spans with the same trace ID, -// i.e. a chunk of a trace -type traceChunk struct { - // the sampling priority of the trace - priority int32 `msg:"priority"` - - // the optional string origin ("lambda", "rum", etc.) of the trace chunk - origin string `msg:"origin,omitempty"` - - // a collection of key to value pairs common in all `spans` - attributes keyValueList `msg:"attributes,omitempty"` - - // a list of spans in this chunk - spans spanList `msg:"spans,omitempty"` - - // whether the trace only contains analyzed spans - // (not required by tracers and set by the agent) - droppedTrace bool `msg:"droppedTrace"` - - // the ID of the trace to which all spans in this chunk belong - traceID []byte `msg:"traceID"` - - // the optional string decision maker (previously span tag _dd.p.dm) - samplingMechanism string `msg:"samplingMechanism,omitempty"` -} diff --git a/ddtrace/tracer/payload_test.go b/ddtrace/tracer/payload_test.go index 4c840904b5..7b009eead1 100644 --- a/ddtrace/tracer/payload_test.go +++ b/ddtrace/tracer/payload_test.go @@ -9,12 +9,15 @@ import ( "bytes" "fmt" "io" + "math" "strconv" "strings" "sync" "sync/atomic" "testing" + "github.com/DataDog/dd-trace-go/v2/internal/globalconfig" + "github.com/DataDog/dd-trace-go/v2/internal/version" "github.com/stretchr/testify/assert" "github.com/tinylib/msgp/msgp" ) @@ -82,17 +85,18 @@ func TestPayloadV04Decode(t *testing.T) { func TestPayloadV1Decode(t *testing.T) { for _, n := range []int{10, 1 << 10} { t.Run(strconv.Itoa(n), func(t *testing.T) { - assert := assert.New(t) - p := newPayloadV1() - - p.containerID = "containerID" - p.languageName = "languageName" - p.languageVersion = "languageVersion" - p.tracerVersion = "tracerVersion" - p.runtimeID = "runtimeID" - p.env = "env" - p.hostname = "hostname" - p.appVersion = "appVersion" + var ( + assert = assert.New(t) + p = newPayloadV1() + ) + p.SetContainerID("containerID") + p.SetLanguageName("go") + p.SetLanguageVersion("1.25") + p.SetTracerVersion(version.Tag) + p.SetRuntimeID(globalconfig.RuntimeID()) + p.SetEnv("test") + p.SetHostname("hostname") + p.SetAppVersion("appVersion") for i := 0; i < n; i++ { _, _ = p.push(newSpanList(i%5 + 1)) @@ -102,9 +106,14 @@ func TestPayloadV1Decode(t *testing.T) { assert.NoError(err) got := newPayloadV1() - _, err = got.Decode(encoded) + buf := bytes.NewBuffer(encoded) + _, err = buf.WriteTo(got) assert.NoError(err) + o, err := got.hydrate() + assert.NoError(err) + assert.Empty(o) + assert.Equal(p.fields, got.fields) assert.Equal(p.containerID, got.containerID) assert.Equal(p.languageName, got.languageName) assert.Equal(p.languageVersion, got.languageVersion) @@ -113,6 +122,54 @@ func TestPayloadV1Decode(t *testing.T) { assert.Equal(p.env, got.env) assert.Equal(p.hostname, got.hostname) assert.Equal(p.appVersion, got.appVersion) + assert.Equal(p.fields, got.fields) + }) + } +} + +func TestPayloadV1EmbeddedStreamingStringTable(t *testing.T) { + p := newPayloadV1() + p.SetHostname("production") + p.SetEnv("production") + p.SetLanguageName("go") + + assert := assert.New(t) + encoded, err := io.ReadAll(p) + assert.NoError(err) + + got := newPayloadV1() + buf := bytes.NewBuffer(encoded) + _, err = buf.WriteTo(got) + assert.NoError(err) + + o, err := got.hydrate() + assert.NoError(err) + assert.Empty(o) + assert.Equal(p.languageName, got.languageName) + assert.Equal(p.hostname, got.hostname) + assert.Equal(p.env, got.env) +} + +func TestPayloadV1UpdateHeader(t *testing.T) { + testCases := []uint32{ // Number of items + 15, + math.MaxUint16, + math.MaxUint32, + } + for _, tc := range testCases { + t.Run(fmt.Sprintf("n=%d", tc), func(t *testing.T) { + var ( + p = payloadV1{ + fields: tc, + header: make([]byte, 8), + } + expected []byte + ) + expected = msgp.AppendMapHeader(expected, tc) + p.updateHeader() + if got := p.header[p.off:]; !bytes.Equal(expected, got) { + t.Fatalf("expected %+v, got %+v", expected, got) + } }) } } diff --git a/ddtrace/tracer/payload_v1.go b/ddtrace/tracer/payload_v1.go index a8044a06e8..407d6ef918 100644 --- a/ddtrace/tracer/payload_v1.go +++ b/ddtrace/tracer/payload_v1.go @@ -10,8 +10,8 @@ import ( "encoding/binary" "fmt" "sync/atomic" + "unique" - "github.com/DataDog/dd-trace-go/v2/ddtrace/ext" "github.com/tinylib/msgp/msgp" ) @@ -29,40 +29,31 @@ import ( // payloadV1 for re-use requires the transport to wait for the HTTP package // Close the request body before attempting to re-use it again! type payloadV1 struct { - // array of strings referenced in this tracer payload, its chunks and spans - // stringTable holds references from a string value to an index. - // the 0th position in the stringTable should always be the empty string. - strings *stringTable `msgp:"strings"` + // setFields tracks which index fields are set (bits 1-11 for field IDs 1-11) + // Bit 0 is unused since field IDs start from 1 + setFields bitmap - // the string ID of the container where the tracer is running - containerID string `msgp:"containerID"` + containerID string - // the string language name of the tracer - languageName string `msgp:"languageName"` + languageName string - // the string language version of the tracer - languageVersion string `msgp:"languageVersion"` + languageVersion string - // the string version of the tracer - tracerVersion string `msgp:"tracerVersion"` + tracerVersion string - // the V4 string UUID representation of a tracer session - runtimeID string `msgp:"runtimeID"` + runtimeID string - // the optional `env` string tag that set with the tracer - env string `msgp:"env,omitempty"` + env string - // the optional string hostname of where the tracer is running - hostname string `msgp:"hostname,omitempty"` + hostname string - // the optional string `version` tag for the application set in the tracer - appVersion string `msgp:"appVersion,omitempty"` + appVersion string // a collection of key to value pairs common in all `chunks` - attributes keyValueList `msgp:"attributes,omitempty"` + attributes keyValueList // a list of trace `chunks` - chunks []traceChunk `msgp:"chunks,omitempty"` + chunks []traceChunk // protocolVersion specifies the trace protocol to use. protocolVersion float64 @@ -75,149 +66,55 @@ type payloadV1 struct { // off specifies the current read position on the header. off int + // writeOff specifies the current write position on the header. + writeOff int + // count specifies the number of items in the stream. count uint32 + // fields specifies the number of fields in the payload. + fields uint32 + // buf holds the sequence of msgpack-encoded items. - buf bytes.Buffer + buf []byte // reader is used for reading the contents of buf. reader *bytes.Reader } -type stringTable struct { - strings []string // list of strings - indices map[string]uint32 // map strings to their indices - nextIndex uint32 // last index of the stringTable -} - -// AnyValue is a representation of the `any` value. It can take the following types: -// - uint32 -// - bool -// - float64 -// - int64 -// - uint8 -// intValue(5) - 0x405 (4 indicates this is an int AnyType, then 5 is encoded using positive fixed int format) -// stringValue(“a”) - 0x1a161 (1 indicates this is a string, then “a” is encoded using fixstr 0xa161) -// stringValue(2) - 0x102 (1 indicates this is a string, then a positive fixed int of 2 refers the 2nd index of the string table) -type anyValue struct { - valueType int - value interface{} -} - -const ( - StringValueType = iota + 1 // string or uint -- 1 - BoolValueType // boolean -- 2 - FloatValueType // float64 -- 3 - IntValueType // int64 -- 4 - BytesValueType // []uint8 -- 5 - ArrayValueType // []AnyValue -- 6 - keyValueListType // []keyValue -- 7 -) - -type arrayValue []anyValue - -// keys in a keyValue can either be a string or a uint32 index -// isString is true when the key is a string value, and false when the key is a uint32 index -type streamingKey struct { - isString bool - stringValue string - idx uint32 -} - -// keyValue is made up of the key and an AnyValue (the type of the value and the value itself) -// The key is either a uint32 index into the string table or a string value. -type keyValue struct { - key streamingKey - value anyValue -} - -type keyValueList []keyValue - // newPayloadV1 returns a ready to use payloadV1. func newPayloadV1() *payloadV1 { return &payloadV1{ protocolVersion: traceProtocolV1, attributes: keyValueList{}, chunks: make([]traceChunk, 0), - strings: newStringTable(), - header: make([]byte, 8), off: 8, + writeOff: 0, } } -func newStringTable() *stringTable { - return &stringTable{ - strings: []string{""}, - indices: map[string]uint32{"": 0}, - nextIndex: 1, - } -} - -func (s *stringTable) Add(str string) { - if _, ok := s.indices[str]; ok { - return - } - s.indices[str] = s.nextIndex - s.strings = append(s.strings, str) - s.nextIndex += 1 -} - // push pushes a new item into the stream. func (p *payloadV1) push(t spanList) (stats payloadStats, err error) { - // We need to hydrate the payload with everything we get from the spans. - // Conceptually, our `t []*Span` corresponds to one `traceChunk`. - origin, priority := "", 0 - for _, span := range t { - if span == nil { - continue - } - if p, ok := span.Context().SamplingPriority(); ok { - origin = span.Context().origin - priority = p - break - } - } - - kv := keyValueList{ - {key: streamingKey{isString: false, idx: 2}, value: anyValue{valueType: StringValueType, value: p.containerID}}, // containerID - {key: streamingKey{isString: false, idx: 3}, value: anyValue{valueType: StringValueType, value: p.languageName}}, // languageName - {key: streamingKey{isString: false, idx: 4}, value: anyValue{valueType: StringValueType, value: p.languageVersion}}, // languageVersion - {key: streamingKey{isString: false, idx: 5}, value: anyValue{valueType: StringValueType, value: p.tracerVersion}}, // tracerVersion - {key: streamingKey{isString: false, idx: 6}, value: anyValue{valueType: StringValueType, value: p.runtimeID}}, // runtimeID - {key: streamingKey{isString: false, idx: 7}, value: anyValue{valueType: StringValueType, value: p.env}}, // env - {key: streamingKey{isString: false, idx: 8}, value: anyValue{valueType: StringValueType, value: p.hostname}}, // hostname - {key: streamingKey{isString: false, idx: 9}, value: anyValue{valueType: StringValueType, value: p.appVersion}}, // appVersion - } - tc := traceChunk{ - priority: int32(priority), - origin: origin, - attributes: keyValueList{}, - spans: t, - traceID: t[0].Context().traceID[:], + spans: t, } p.chunks = append(p.chunks, tc) - wr := msgp.NewWriter(&p.buf) - - err = tc.EncodeMsg(wr, p) - if err != nil { - return payloadStats{}, err - } - - // once we've encoded the spans, we can encode the attributes - kv = append(kv, keyValue{key: streamingKey{isString: false, idx: 10}, value: anyValue{valueType: keyValueListType, value: p.attributes}}) // attributes - err = kv.EncodeMsg(wr, p) - if err == nil { - err = wr.Flush() - } p.recordItem() return p.stats(), err } func (p *payloadV1) grow(n int) { - p.buf.Grow(n) + c := cap(p.buf) - len(p.buf) + // if n fits in current available capacity, don't allocate + if n <= c { + return + } + // allocating 1.5 times what's needed, to reduce allocations + m := n + len(p.buf) + buf := make([]byte, (m+1)*3/2) + copy(buf, p.buf) + p.buf = buf } func (p *payloadV1) reset() { @@ -228,13 +125,13 @@ func (p *payloadV1) reset() { } func (p *payloadV1) clear() { - p.buf = bytes.Buffer{} + p.fields = 0 + p.buf = p.buf[:] p.reader = nil } func (p *payloadV1) recordItem() { atomic.AddUint32(&p.count, 1) - p.updateHeader() } func (p *payloadV1) stats() payloadStats { @@ -245,7 +142,7 @@ func (p *payloadV1) stats() payloadStats { } func (p *payloadV1) size() int { - return p.buf.Len() + len(p.header) - p.off + return len(p.buf) + len(p.header) - p.off } func (p *payloadV1) itemCount() int { @@ -256,927 +153,379 @@ func (p *payloadV1) protocol() float64 { return p.protocolVersion } +// updateHeader updates the payload header based on the number of items currently +// present in the stream. func (p *payloadV1) updateHeader() { - n := uint64(atomic.LoadUint32(&p.count)) + n := uint64(p.fields) switch { case n <= 15: - p.header[7] = msgpackArrayFix + byte(n) + p.header[7] = msgpackMapFix + byte(n) p.off = 7 case n <= 1<<16-1: binary.BigEndian.PutUint64(p.header, n) // writes 2 bytes - p.header[5] = msgpackArray16 + p.header[5] = msgpackMap16 p.off = 5 default: // n <= 1<<32-1 binary.BigEndian.PutUint64(p.header, n) // writes 4 bytes - p.header[3] = msgpackArray32 + p.header[3] = msgpackMap32 p.off = 3 } } -// Close implements io.Closer -func (p *payloadV1) Close() error { - return nil -} - -// Write implements io.Writer. It writes data directly to the buffer. -func (p *payloadV1) Write(data []byte) (n int, err error) { - return p.buf.Write(data) -} - // Read implements io.Reader. It reads from the msgpack-encoded stream. -func (p *payloadV1) Read(b []byte) (n int, err error) { +func (p *payloadV1) Read(b []byte) (int, error) { + var n int + if len(p.header) == 0 { + p.header = make([]byte, 8) + p.updateHeader() + } if p.off < len(p.header) { // reading header n = copy(b, p.header[p.off:]) p.off += n return n, nil } + if len(p.buf) == 0 { + p.encode() + } if p.reader == nil { - p.reader = bytes.NewReader(p.buf.Bytes()) + p.reader = bytes.NewReader(p.buf) } return p.reader.Read(b) } -// Encode the anyValue -func (a *anyValue) EncodeMsg(e *msgp.Writer, p *payloadV1) error { - switch a.valueType { - case StringValueType: - e.WriteInt32(StringValueType) - v, err := p.encodeString(a.value.(string)) - if err != nil { - return err - } - if v.isString { - return e.WriteString(v.stringValue) - } - return e.WriteUint32(v.idx) - case BoolValueType: - e.WriteInt32(BoolValueType) - return e.WriteBool(a.value.(bool)) - case FloatValueType: - e.WriteInt32(FloatValueType) - return e.WriteFloat64(a.value.(float64)) - case IntValueType: - e.WriteInt32(IntValueType) - return e.WriteInt64(a.value.(int64)) - case BytesValueType: - e.WriteInt32(BytesValueType) - return e.WriteBytes(a.value.([]byte)) - case ArrayValueType: - e.WriteInt32(ArrayValueType) - return a.value.(arrayValue).EncodeMsg(e, p) - case keyValueListType: - e.WriteInt32(keyValueListType) - return a.value.(keyValueList).EncodeMsg(e, p) - default: - return fmt.Errorf("invalid value type: %d", a.valueType) - } -} - -func (av arrayValue) EncodeMsg(e *msgp.Writer, p *payloadV1) error { - err := e.WriteArrayHeader(uint32(len(av))) - if err != nil { - return err - } - for _, value := range av { - if err := value.EncodeMsg(e, p); err != nil { - return err - } - } - return nil -} - -func (k keyValue) EncodeMsg(e *msgp.Writer, p *payloadV1) error { - var err error - if k.key.isString { - err = e.WriteString(k.key.stringValue) - } else { - err = e.WriteUint32(k.key.idx) - } - if err != nil { - return err - } - err = k.value.EncodeMsg(e, p) - if err != nil { - return err - } - return nil +func (p *payloadV1) encode() { + seen := newStringTable() + p.encodeField(2, p.containerID, seen) + p.encodeField(3, p.languageName, seen) + p.encodeField(4, p.languageVersion, seen) + p.encodeField(5, p.tracerVersion, seen) + p.encodeField(6, p.runtimeID, seen) + p.encodeField(7, p.env, seen) + p.encodeField(8, p.hostname, seen) + p.encodeField(9, p.appVersion, seen) } -func (kv keyValueList) EncodeMsg(e *msgp.Writer, p *payloadV1) error { - err := e.WriteMapHeader(uint32(len(kv))) - if err != nil { - return err +func (p *payloadV1) encodeField(ref uint32, v string, seen *stringTable) { + if !p.setFields.Has(ref) { + return } - for _, k := range kv { - err = k.EncodeMsg(e, p) - if err != nil { - return err - } + p.buf = msgp.AppendUint32(p.buf, ref) + if idx, ok := seen.Get(v); ok { + p.buf = idx.encode(p.buf) + return } - return nil + w := encodableString(v) + p.buf = w.encode(p.buf) + seen.Add(v) } -func (t *traceChunk) EncodeMsg(e *msgp.Writer, p *payloadV1) error { - e.WriteInt32(11) // write msgp index for `chunks` - - kv := keyValueList{ - {key: streamingKey{isString: false, idx: 1}, value: anyValue{valueType: IntValueType, value: int64(t.priority)}}, // priority - {key: streamingKey{isString: false, idx: 2}, value: anyValue{valueType: StringValueType, value: t.origin}}, // origin - {key: streamingKey{isString: false, idx: 5}, value: anyValue{valueType: BoolValueType, value: t.droppedTrace}}, // droppedTrace - {key: streamingKey{isString: false, idx: 6}, value: anyValue{valueType: BytesValueType, value: t.traceID}}, // traceID - {key: streamingKey{isString: false, idx: 7}, value: anyValue{valueType: StringValueType, value: t.samplingMechanism}}, // samplingMechanism - } - - attr := keyValueList{} - for k, v := range t.attributes { - attr = append(attr, keyValue{key: streamingKey{isString: false, idx: uint32(k)}, value: anyValue{valueType: getAnyValueType(v), value: v}}) - } - kv = append(kv, keyValue{key: streamingKey{isString: false, idx: 3}, value: anyValue{valueType: keyValueListType, value: attr}}) // attributes - - err := kv.EncodeMsg(e, p) - if err != nil { - return err - } - - return EncodeSpanList(t.spans, e, p) +// Write implements io.Writer. It writes data directly to the internal buffers. +func (p *payloadV1) Write(data []byte) (int, error) { + p.buf = append(p.buf, data...) + return len(data), nil } -func EncodeSpanList(s spanList, e *msgp.Writer, p *payloadV1) error { - e.WriteInt32(4) // write msgp index for `spans` - - err := e.WriteArrayHeader(uint32(len(s))) +func (p *payloadV1) hydrate() ([]byte, error) { + n, data, err := msgp.ReadMapHeaderBytes(p.buf) if err != nil { - return msgp.WrapError(err) + return data, err } + p.buf = data - for _, span := range s { - if span == nil { - err := e.WriteNil() - if err != nil { - return err - } - } else { - err := encodeSpan(span, e, p) - if err != nil { - return msgp.WrapError(err, span) - } - } - } - - return nil -} - -// Custom encoding for spans under the v1 trace protocol. -// The encoding of attributes is the combination of the meta, metrics, and metaStruct fields of the v0.4 protocol. -func encodeSpan(s *Span, e *msgp.Writer, p *payloadV1) error { - kv := keyValueList{ - {key: streamingKey{isString: false, idx: 1}, value: anyValue{valueType: StringValueType, value: s.service}}, // service - {key: streamingKey{isString: false, idx: 2}, value: anyValue{valueType: StringValueType, value: s.name}}, // name - {key: streamingKey{isString: false, idx: 3}, value: anyValue{valueType: StringValueType, value: s.resource}}, // resource - {key: streamingKey{isString: false, idx: 4}, value: anyValue{valueType: IntValueType, value: int64(s.spanID)}}, // spanID - {key: streamingKey{isString: false, idx: 5}, value: anyValue{valueType: IntValueType, value: int64(s.parentID)}}, // parentID - {key: streamingKey{isString: false, idx: 6}, value: anyValue{valueType: IntValueType, value: int64(s.start)}}, // start - {key: streamingKey{isString: false, idx: 7}, value: anyValue{valueType: IntValueType, value: int64(s.duration)}}, // duration - {key: streamingKey{isString: false, idx: 8}, value: anyValue{valueType: BoolValueType, value: (s.error != 0)}}, // error - true if span has error - {key: streamingKey{isString: false, idx: 10}, value: anyValue{valueType: StringValueType, value: s.spanType}}, // type - {key: streamingKey{isString: false, idx: 15}, value: anyValue{valueType: StringValueType, value: s.integration}}, // component - } + p.fields = n + p.header = make([]byte, 8) + p.updateHeader() - // encode meta attributes - attr := keyValueList{} - for k, v := range s.meta { - idx, err := p.encodeString(k) + var ( + o = p.buf + seen = newStringTable() + ) + for { + var ( + ref uint32 + ok bool + v string + ) + ref, o, err = msgp.ReadUint32Bytes(o) if err != nil { - idx = streamingKey{isString: true, stringValue: k} + break } - attr = append(attr, keyValue{key: idx, value: anyValue{valueType: StringValueType, value: v}}) - } - - // encode metric attributes - for k, v := range s.metrics { - idx, err := p.encodeString(k) - if err != nil { - idx = streamingKey{isString: true, stringValue: k} + v, ok, o = seen.Read(o) + if !ok { + err = fmt.Errorf("invalid data for field %d", ref) + break } - attr = append(attr, keyValue{key: idx, value: anyValue{valueType: FloatValueType, value: v}}) - } - - // encode metaStruct attributes - for k, v := range s.metaStruct { - idx, err := p.encodeString(k) - if err != nil { - idx = streamingKey{isString: true, stringValue: k} + switch ref { + case 2: // containerID + p.containerID = v + case 3: // languageName + p.languageName = v + case 4: // languageVersion + p.languageVersion = v + case 5: // tracerVersion + p.tracerVersion = v + case 6: // runtimeID + p.runtimeID = v + case 7: // env + p.env = v + case 8: // hostname + p.hostname = v + case 9: // appVersion + p.appVersion = v + default: + err = fmt.Errorf("unknown field %d", ref) + } + if len(o) == 0 || err != nil { + break } - attr = append(attr, keyValue{key: idx, value: anyValue{valueType: getAnyValueType(v), value: v}}) - } - - kv = append(kv, keyValue{key: streamingKey{isString: false, idx: 9}, value: anyValue{valueType: keyValueListType, value: attr}}) // attributes - - env, ok := s.meta["env"] - if ok { - kv = append(kv, keyValue{key: streamingKey{isString: false, idx: 13}, value: anyValue{valueType: StringValueType, value: env}}) // env - } - version, ok := s.meta["version"] - if ok { - kv = append(kv, keyValue{key: streamingKey{isString: false, idx: 14}, value: anyValue{valueType: StringValueType, value: version}}) // version - } - - err := kv.EncodeMsg(e, p) - if err != nil { - return err - } - - // spanLinks - err = encodeSpanLinks(s.spanLinks, e, p) - if err != nil { - return err } + return o, err +} - // spanEvents - return encodeSpanEvents(s.spanEvents, e, p) +// Close implements io.Closer +func (p *payloadV1) Close() error { + p.clear() + return nil } -// encodeString and decodeString handles encoding a string to the payload's string table. -// When writing a string: -// - use its index in the string table if it exists -// - otherwise, write the string into the message, then add the string at the next index -// Returns the index of the string in the string table, and an error if there is one -func (p *payloadV1) encodeString(s string) (streamingKey, error) { - sTable := p.strings - idx, ok := sTable.indices[s] - // if the string already exists in the table, use its index - if ok { - return streamingKey{isString: false, idx: idx}, nil - } +// Field accessors for backward compatibility - these delegate to the bitmap +func (p *payloadV1) ContainerID() string { return p.containerID } +func (p *payloadV1) LanguageName() string { return p.languageName } +func (p *payloadV1) LanguageVersion() string { return p.languageVersion } +func (p *payloadV1) TracerVersion() string { return p.tracerVersion } +func (p *payloadV1) RuntimeID() string { return p.runtimeID } +func (p *payloadV1) Env() string { return p.env } +func (p *payloadV1) Hostname() string { return p.hostname } +func (p *payloadV1) AppVersion() string { return p.appVersion } - // else, write the string into the table at the next index - // return an error to indicate that the string should be written to the msgp message - sTable.Add(s) - return streamingKey{isString: true, stringValue: s}, nil +func (p *payloadV1) SetContainerID(value string) { + p.containerID = value + p.setFields.Set(2) + p.fields++ } -// encodeSpanLinks encodes the span links into a msgp.Writer -// Span links are represented as an array of fixmaps (keyValueList) -func encodeSpanLinks(sl []SpanLink, e *msgp.Writer, p *payloadV1) error { - err := e.WriteInt32(11) // spanLinks - if err != nil { - return err - } - - // write the number of span links - err = e.WriteArrayHeader(uint32(len(sl))) - if err != nil { - return err - } +func (p *payloadV1) SetLanguageName(value string) { + p.languageName = value + p.setFields.Set(3) + p.fields++ +} - // represent each span link as a fixmap (keyValueList) and add it to an array - kv := arrayValue{} - for _, s := range sl { - slKeyValues := keyValueList{ - {key: streamingKey{isString: false, idx: 1}, value: anyValue{valueType: IntValueType, value: int64(s.TraceID)}}, // traceID - {key: streamingKey{isString: false, idx: 2}, value: anyValue{valueType: IntValueType, value: int64(s.SpanID)}}, // spanID - {key: streamingKey{isString: false, idx: 4}, value: anyValue{valueType: StringValueType, value: s.Tracestate}}, // tracestate - {key: streamingKey{isString: false, idx: 5}, value: anyValue{valueType: IntValueType, value: int64(s.Flags)}}, // flags - } +func (p *payloadV1) SetLanguageVersion(value string) { + p.languageVersion = value + p.setFields.Set(4) + p.fields++ +} - attr := keyValueList{} - // attributes - for k, v := range s.Attributes { - idx, err := p.encodeString(k) - if err != nil { - idx = streamingKey{isString: true, stringValue: k} - } - attr = append(attr, keyValue{key: idx, value: anyValue{valueType: getAnyValueType(v), value: v}}) - } - slKeyValues = append(slKeyValues, keyValue{key: streamingKey{isString: false, idx: 3}, value: anyValue{valueType: ArrayValueType, value: attr}}) // attributes - kv = append(kv, anyValue{valueType: keyValueListType, value: slKeyValues}) - } +func (p *payloadV1) SetTracerVersion(value string) { + p.tracerVersion = value + p.setFields.Set(5) + p.fields++ +} - for _, v := range kv { - err := v.EncodeMsg(e, p) - if err != nil { - return err - } - } - return nil +func (p *payloadV1) SetRuntimeID(value string) { + p.runtimeID = value + p.setFields.Set(6) + p.fields++ } -// encodeSpanEvents encodes the span events into a msgp.Writer -// Span events are represented as an array of fixmaps (keyValueList) -func encodeSpanEvents(se []spanEvent, e *msgp.Writer, p *payloadV1) error { - err := e.WriteInt32(12) // spanEvents - if err != nil { - return err - } +func (p *payloadV1) SetEnv(value string) { + p.env = value + p.setFields.Set(7) + p.fields++ +} - // write the number of span events - err = e.WriteArrayHeader(uint32(len(se))) - if err != nil { - return err - } +func (p *payloadV1) SetHostname(value string) { + p.hostname = value + p.setFields.Set(8) + p.fields++ +} - // represent each span event as a fixmap (keyValueList) and add it to an array - kv := arrayValue{} - for _, s := range se { - slKeyValues := keyValueList{ - {key: streamingKey{isString: false, idx: 1}, value: anyValue{valueType: IntValueType, value: int64(s.TimeUnixNano)}}, // time - {key: streamingKey{isString: false, idx: 2}, value: anyValue{valueType: StringValueType, value: s.Name}}, // name - } +func (p *payloadV1) SetAppVersion(value string) { + p.appVersion = value + p.setFields.Set(9) + p.fields++ +} - attr := keyValueList{} - // attributes - for k, v := range s.Attributes { - idx, err := p.encodeString(k) - if err != nil { - idx = streamingKey{isString: true, stringValue: k} - } - attr = append(attr, keyValue{key: idx, value: anyValue{valueType: getAnyValueType(v), value: v}}) +// detectStringOrUint32Format examines the first byte of MessagePack data +// to determine if it represents a string or uint32 format. +// Returns 0 if string, 1 if uint32, or -1 if invalid. +func detectStringOrUint32Format(firstByte byte) int8 { + switch firstByte { + // String formats + case 0xd9, 0xda, 0xdb: // str8, str16, str32 + return 0 + case 0xce: // uint32 + return 1 + default: + // Check for fixstr: high 3 bits should be 0b101 (0xa0) + if firstByte&0xe0 == 0xa0 { + return 0 } - slKeyValues = append(slKeyValues, keyValue{key: streamingKey{isString: false, idx: 3}, value: anyValue{valueType: ArrayValueType, value: attr}}) // attributes - kv = append(kv, anyValue{valueType: keyValueListType, value: slKeyValues}) - } - - for _, v := range kv { - err := v.EncodeMsg(e, p) - if err != nil { - return err + // Check for positive fixint: high bit should be 0 (values 0-127) + if firstByte&0x80 == 0 { + return 1 } + return -1 } - return nil } -func getAnyValueType(v any) int { - switch v.(type) { - case string: - return StringValueType - case bool: - return BoolValueType - case float64: - return FloatValueType - case float32: - return FloatValueType - case []byte: - return BytesValueType - } - return IntValueType -} +type encodableString string -func (p *payloadV1) Decode(b []byte) ([]byte, error) { - if p.strings == nil { - p.strings = newStringTable() - } +func (es encodableString) encode(buf []byte) []byte { + return msgp.AppendString(buf, string(es)) +} - fields, o, err := msgp.ReadArrayHeaderBytes(b) +func (es *encodableString) decode(buf []byte) ([]byte, error) { + v, o, err := msgp.ReadStringBytes(buf) if err != nil { return o, err } + *es = encodableString(v) + return o, nil +} - for fields > 0 { - fields-- - - f, o, err := msgp.ReadInt32Bytes(o) - if err != nil { - return o, err - } - - switch f { - // we don't care for the string table, so we don't decode it - case 2: // containerID - p.containerID, o, err = DecodeStreamingString(o, p.strings) - if err != nil { - return o, err - } - - case 3: // languageName - p.languageName, o, err = DecodeStreamingString(o, p.strings) - if err != nil { - return o, err - } - - case 4: // languageVersion - p.languageVersion, o, err = DecodeStreamingString(o, p.strings) - if err != nil { - return o, err - } - - case 5: // tracerVersion - p.tracerVersion, o, err = DecodeStreamingString(o, p.strings) - if err != nil { - return o, err - } - - case 6: // runtimeID - p.runtimeID, o, err = DecodeStreamingString(o, p.strings) - if err != nil { - return o, err - } - - case 7: // env - p.env, o, err = DecodeStreamingString(o, p.strings) - if err != nil { - return o, err - } - - case 8: // hostname - p.hostname, o, err = DecodeStreamingString(o, p.strings) - if err != nil { - return o, err - } +type index uint32 - case 9: // appVersion - p.appVersion, o, err = DecodeStreamingString(o, p.strings) - if err != nil { - return o, err - } - case 10: // attributes - p.attributes, o, err = DecodeKeyValueList(o, p.strings) - if err != nil { - return o, err - } - case 11: // chunks - p.chunks, o, err = DecodeTraceChunks(o, p.strings) - if err != nil { - return o, err - } - } - } - return o, nil +func (i index) encode(buf []byte) []byte { + return msgp.AppendUint32(buf, uint32(i)) } -func DecodeStringTable(b []byte, strings *stringTable) ([]byte, error) { - len, o, err := msgp.ReadBytesHeader(b) +func (i *index) decode(buf []byte) ([]byte, error) { + v, o, err := msgp.ReadUint32Bytes(buf) if err != nil { - return nil, err + return o, err } + *i = index(v) + return o, nil +} - for len > 0 { - len-- - str, o, err := msgp.ReadStringBytes(o) - if err != nil { - return o, err - } +type bitmap uint16 - // if we've seen the string before, skip - if _, ok := strings.indices[str]; ok { - continue - } +func (bm *bitmap) Set(i uint32) { + *bm |= (1 << i) +} - strings.Add(str) - } - return o, nil +func (bm bitmap) Has(i uint32) bool { + return bm&(1< 0 { - fields-- - - f, o, err := msgp.ReadUint32Bytes(o) - if err != nil { - return ret, o, err - } - - switch f { - case 1: // priority - s, o, err := msgp.ReadInt32Bytes(o) - if err != nil { - return ret, o, err - } - tc.priority = s - case 2: // origin - s, o, err := msgp.ReadStringBytes(o) - if err != nil { - return ret, o, err - } - tc.origin = s - case 3: // attributes - kv, o, err := DecodeKeyValueList(o, strings) - if err != nil { - return ret, o, err - } - tc.attributes = kv - case 4: // spans - s, o, err := DecodeSpanList(o, strings) - if err != nil { - return ret, o, err - } - tc.spans = s - case 5: // droppedTrace - s, o, err := msgp.ReadBoolBytes(o) - if err != nil { - return ret, o, err - } - tc.droppedTrace = s - case 6: // traceID - s, o, err := msgp.ReadBytesBytes(o, nil) - if err != nil { - return ret, o, err - } - tc.traceID = []byte(s) - case 7: // samplingMechanism - s, o, err := msgp.ReadStringBytes(o) - if err != nil { - return ret, o, err - } - tc.samplingMechanism = s - } - } - ret[i] = tc - } - return ret, o, nil +// AnyValue is a representation of the `any` value. It can take the following types: +// - uint32 +// - bool +// - float64 +// - int64 +// - uint8 +// intValue(5) - 0x405 (4 indicates this is an int AnyType, then 5 is encoded using positive fixed int format) +// stringValue(“a”) - 0x1a161 (1 indicates this is a string, then “a” is encoded using fixstr 0xa161) +// stringValue(2) - 0x102 (1 indicates this is a string, then a positive fixed int of 2 refers the 2nd index of the string table) +type anyValue struct { + valueType int + value interface{} } -func DecodeSpanList(b []byte, strings *stringTable) (spanList, []byte, error) { - len, o, err := msgp.ReadArrayHeaderBytes(b) - if err != nil { - return nil, o, err - } - ret := make([]*Span, len) - for i := range len { - ret[i], o, err = DecodeSpan(o, strings) - if err != nil { - return nil, o, err - } - } - return ret, o, nil +const ( + StringValueType = iota + 1 // string or uint -- 1 + BoolValueType // boolean -- 2 + FloatValueType // float64 -- 3 + IntValueType // int64 -- 4 + BytesValueType // []uint8 -- 5 + ArrayValueType // []AnyValue -- 6 + keyValueListType // []keyValue -- 7 +) + +// keyValue is made up of the key and an AnyValue (the type of the value and the value itself) +// The key is either a uint32 index into the string table or a string value. +type keyValue struct { + key encodableString + value anyValue } -func DecodeSpan(b []byte, strings *stringTable) (*Span, []byte, error) { - sp := Span{} - fields, o, err := msgp.ReadMapHeaderBytes(b) - if err != nil { - return &sp, o, err - } +type keyValueList []keyValue - for fields > 0 { - fields-- +// traceChunk represents a list of spans with the same trace ID, +// i.e. a chunk of a trace +type traceChunk struct { + // the sampling priority of the trace + priority int32 - f, o, err := msgp.ReadUint32Bytes(o) - if err != nil { - return &sp, o, err - } + // the optional string origin ("lambda", "rum", etc.) of the trace chunk + origin string - switch f { - case 1: // service - st, o, err := msgp.ReadStringBytes(o) - if err != nil { - return &sp, o, err - } - sp.service = st - case 2: // name - st, o, err := msgp.ReadStringBytes(o) - if err != nil { - return &sp, o, err - } - sp.name = st - case 3: // resource - st, o, err := msgp.ReadStringBytes(o) - if err != nil { - return &sp, o, err - } - sp.resource = st - case 4: // spanID - i, o, err := msgp.ReadInt64Bytes(o) - if err != nil { - return &sp, o, err - } - sp.spanID = uint64(i) - case 5: // parentID - i, o, err := msgp.ReadInt64Bytes(o) - if err != nil { - return &sp, o, err - } - sp.parentID = uint64(i) - case 6: // start - i, o, err := msgp.ReadInt64Bytes(o) - if err != nil { - return &sp, o, err - } - sp.start = i - case 7: // duration - i, o, err := msgp.ReadInt64Bytes(o) - if err != nil { - return &sp, o, err - } - sp.duration = i - case 8: // error - i, o, err := msgp.ReadBoolBytes(o) - if err != nil { - return &sp, o, err - } - if i { - sp.error = 1 - } else { - sp.error = 0 - } - case 9: // attributes - kv, o, err := DecodeKeyValueList(o, strings) - if err != nil { - return &sp, o, err - } - for k, v := range kv { - key := strings.strings[k] - sp.SetTag(key, v.value.value) - } - case 10: // type - st, o, err := msgp.ReadStringBytes(o) - if err != nil { - return &sp, o, err - } - sp.spanType = st - case 11: // spanLinks - sl, o, err := DecodeSpanLinks(o, strings) - if err != nil { - return &sp, o, err - } - sp.spanLinks = sl - case 12: // spanEvents - se, o, err := DecodeSpanEvents(o, strings) - if err != nil { - return &sp, o, err - } - sp.spanEvents = se - case 13: // env - s, o, err := msgp.ReadStringBytes(o) - if err != nil { - return &sp, o, err - } - sp.SetTag(ext.Environment, s) - case 14: // version - s, o, err := msgp.ReadStringBytes(o) - if err != nil { - return &sp, o, err - } - sp.setMeta(ext.Version, s) - case 15: // component - s, o, err := msgp.ReadStringBytes(o) - if err != nil { - return &sp, o, err - } - sp.integration = s - } - } - return &sp, nil, nil -} + // a collection of key to value pairs common in all `spans` + attributes keyValueList -func DecodeSpanLinks(b []byte, strings *stringTable) ([]SpanLink, []byte, error) { - numSpanLinks, o, err := msgp.ReadArrayHeaderBytes(b) - if err != nil { - return nil, o, err - } + // a list of spans in this chunk + spans spanList - ret := make([]SpanLink, numSpanLinks) - for i := range numSpanLinks { - sl := SpanLink{} - fields, o, err := msgp.ReadMapHeaderBytes(o) - if err != nil { - return ret, o, err - } - for fields > 0 { - fields-- - - f, o, err := msgp.ReadUint32Bytes(o) - if err != nil { - return ret, o, err - } - - switch f { - case 1: // traceID - s, o, err := msgp.ReadInt64Bytes(o) - if err != nil { - return ret, o, err - } - sl.TraceID = uint64(s) - case 2: // spanID - s, o, err := msgp.ReadInt64Bytes(o) - if err != nil { - return ret, o, err - } - sl.SpanID = uint64(s) - case 3: // attributes - kv, o, err := DecodeKeyValueList(o, strings) - if err != nil { - return ret, o, err - } - for k, v := range kv { - key := strings.strings[k] - s, ok := v.value.value.(string) - if !ok { - err := msgp.WrapError(fmt.Errorf("expected string value type for span link attributes, got %T", v.value.value)) - return ret, o, err - } - sl.Attributes[key] = s - } - case 4: // tracestate - s, o, err := msgp.ReadStringBytes(o) - if err != nil { - return ret, o, err - } - sl.Tracestate = s - case 5: // flags - s, o, err := msgp.ReadUint32Bytes(o) - if err != nil { - return ret, o, err - } - sl.Flags = s - } - } - ret[i] = sl - } - return ret, o, nil -} + // whether the trace only contains analyzed spans + // (not required by tracers and set by the agent) + droppedTrace bool -func DecodeSpanEvents(b []byte, strings *stringTable) ([]spanEvent, []byte, error) { - numSpanEvents, o, err := msgp.ReadArrayHeaderBytes(b) - if err != nil { - return nil, o, err - } - ret := make([]spanEvent, numSpanEvents) - for i := range numSpanEvents { - se := spanEvent{} - fields, o, err := msgp.ReadMapHeaderBytes(o) - if err != nil { - return ret, o, err - } - for fields > 0 { - fields-- - - f, o, err := msgp.ReadUint32Bytes(o) - if err != nil { - return ret, o, err - } - - switch f { - case 1: // time - s, o, err := msgp.ReadInt64Bytes(o) - if err != nil { - return ret, o, err - } - se.TimeUnixNano = uint64(s) - case 2: // name - s, o, err := msgp.ReadStringBytes(o) - if err != nil { - return ret, o, err - } - se.Name = s - case 4: // attributes - kv, o, err := DecodeKeyValueList(o, strings) - if err != nil { - return ret, o, err - } - for k, v := range kv { - key := strings.strings[k] - switch v.value.valueType { - case StringValueType: - se.Attributes[key] = &spanEventAttribute{ - Type: spanEventAttributeTypeString, - StringValue: v.value.value.(string), - } - case BoolValueType: - se.Attributes[key] = &spanEventAttribute{ - Type: spanEventAttributeTypeBool, - BoolValue: v.value.value.(bool), - } - case IntValueType: - se.Attributes[key] = &spanEventAttribute{ - Type: spanEventAttributeTypeInt, - IntValue: v.value.value.(int64), - } - case FloatValueType: - se.Attributes[key] = &spanEventAttribute{ - Type: spanEventAttributeTypeDouble, - DoubleValue: v.value.value.(float64), - } - case ArrayValueType: - se.Attributes[key] = &spanEventAttribute{ - Type: spanEventAttributeTypeArray, - ArrayValue: v.value.value.(*spanEventArrayAttribute), - } - default: - err := msgp.WrapError(fmt.Errorf("unexpected value type not supported by span events: %T", v.value.value)) - return ret, o, err - } - } - } - } - ret[i] = se - } - return ret, nil, nil + // the ID of the trace to which all spans in this chunk belong + traceID []byte + + // the optional string decision maker (previously span tag _dd.p.dm) + samplingMechanism string }