-
Notifications
You must be signed in to change notification settings - Fork 488
feat: v1 trace protocol implementation #3947
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 15 commits
f5d87c3
25440fd
d0635e4
478ef22
230633a
c0f2493
1e941ac
dc00fd3
98d30ce
0ad7dd1
326a244
ea80c7a
af26f15
ded707d
5124410
bb1baaf
f071e05
03abab3
22e70ec
68ab0d8
23c72cf
a12502c
08b1b97
60165d3
11a4ad9
6d10c7a
46a6cb9
170a42c
7d6c3cc
5881a50
dc26c98
9030925
269988e
8c7bef3
8f06c7b
80dbd54
b64c68f
8908891
e739a34
3a6e7fa
f3f86ab
91c5929
58d2f62
82c805b
1392569
bbb62d2
d226900
72fae1d
7d4c9cf
a989bde
e2ef0d5
0067018
5e1633d
888c82c
c4b126d
d931d7e
a8b15f2
2ab2051
092f607
639f9b2
581c994
0487a5a
8813fa5
ff24c1f
50724d8
883e3f2
bc024ce
e759d0e
e4ae913
da2fd94
813031f
6ad1afe
7fc1f29
f2ecb71
ef59f66
8293cd1
bff64ad
c398de9
5263a6f
6e71dbd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,13 +6,8 @@ | |
| package tracer | ||
|
|
||
| import ( | ||
| "bytes" | ||
| "encoding/binary" | ||
| "io" | ||
| "sync" | ||
| "sync/atomic" | ||
|
|
||
| "github.com/tinylib/msgp/msgp" | ||
| ) | ||
|
|
||
| // payloadStats contains the statistics of a payload. | ||
|
|
@@ -51,190 +46,29 @@ | |
| payloadReader | ||
| } | ||
|
|
||
| // unsafePayload is a wrapper on top of the msgpack encoder which allows constructing an | ||
| // encoded array by pushing its entries sequentially, one at a time. It basically | ||
| // allows us to encode as we would with a stream, except that the contents of the stream | ||
| // can be read as a slice by the msgpack decoder at any time. It follows the guidelines | ||
| // from the msgpack array spec: | ||
| // https://github.com/msgpack/msgpack/blob/master/spec.md#array-format-family | ||
| // | ||
| // unsafePayload implements io.Reader and can be used with the decoder directly. | ||
| // | ||
| // unsafePayload is not safe for concurrent use. | ||
| // | ||
| // unsafePayload is meant to be used only once and eventually dismissed with the | ||
| // single exception of retrying failed flush attempts. | ||
| // | ||
| // ⚠️ Warning! | ||
| // | ||
| // The payload should not be reused for multiple sets of traces. Resetting the | ||
| // payload for re-use requires the transport to wait for the HTTP package to | ||
| // Close the request body before attempting to re-use it again! This requires | ||
| // additional logic to be in place. See: | ||
| // | ||
| // • https://github.com/golang/go/blob/go1.16/src/net/http/client.go#L136-L138 | ||
| // • https://github.com/DataDog/dd-trace-go/pull/475 | ||
| // • https://github.com/DataDog/dd-trace-go/pull/549 | ||
| // • https://github.com/DataDog/dd-trace-go/pull/976 | ||
| type unsafePayload struct { | ||
| // header specifies the first few bytes in the msgpack stream | ||
| // indicating the type of array (fixarray, array16 or array32) | ||
| // and the number of items contained in the stream. | ||
| header []byte | ||
|
|
||
| // off specifies the current read position on the header. | ||
| off int | ||
|
|
||
| // count specifies the number of items in the stream. | ||
| count uint32 | ||
|
|
||
| // buf holds the sequence of msgpack-encoded items. | ||
| buf bytes.Buffer | ||
|
|
||
| // reader is used for reading the contents of buf. | ||
| reader *bytes.Reader | ||
|
|
||
| // protocolVersion specifies the trace protocolVersion to use. | ||
| protocolVersion float64 | ||
| } | ||
|
|
||
| var _ io.Reader = (*unsafePayload)(nil) | ||
|
|
||
| // newUnsafePayload returns a ready to use unsafe payload. | ||
| func newUnsafePayload(protocol float64) *unsafePayload { | ||
| p := &unsafePayload{ | ||
| header: make([]byte, 8), | ||
| off: 8, | ||
| protocolVersion: protocol, | ||
| } | ||
| return p | ||
| } | ||
|
|
||
| // push pushes a new item into the stream. | ||
| func (p *unsafePayload) push(t []*Span) (stats payloadStats, err error) { | ||
| sl := spanList(t) | ||
| p.buf.Grow(sl.Msgsize()) | ||
| if err := msgp.Encode(&p.buf, sl); err != nil { | ||
| return payloadStats{}, err | ||
| // newPayload returns a ready to use payload. | ||
| func newPayload(protocol float64) payload { | ||
| if protocol == traceProtocolV1 { | ||
| return &safePayload{ | ||
| p: newPayloadV1(), | ||
|
Check failure on line 53 in ddtrace/tracer/payload.go
|
||
| } | ||
| } | ||
| p.recordItem() | ||
| return p.stats(), nil | ||
| } | ||
|
|
||
| // itemCount returns the number of items available in the stream. | ||
| func (p *unsafePayload) itemCount() int { | ||
| return int(atomic.LoadUint32(&p.count)) | ||
| } | ||
|
|
||
| // size returns the payload size in bytes. After the first read the value becomes | ||
| // inaccurate by up to 8 bytes. | ||
| func (p *unsafePayload) size() int { | ||
| return p.buf.Len() + len(p.header) - p.off | ||
| } | ||
|
|
||
| // reset sets up the payload to be read a second time. It maintains the | ||
| // underlying byte contents of the buffer. reset should not be used in order to | ||
| // reuse the payload for another set of traces. | ||
| func (p *unsafePayload) reset() { | ||
| p.updateHeader() | ||
| if p.reader != nil { | ||
| p.reader.Seek(0, 0) | ||
| return &safePayload{ | ||
| p: newPayloadV04(), | ||
| } | ||
| } | ||
|
|
||
| // clear empties the payload buffers. | ||
| func (p *unsafePayload) clear() { | ||
| p.buf = bytes.Buffer{} | ||
| p.reader = nil | ||
| } | ||
|
|
||
| // https://github.com/msgpack/msgpack/blob/master/spec.md#array-format-family | ||
| const ( | ||
| msgpackArrayFix byte = 144 // up to 15 items | ||
| msgpackArray16 byte = 0xdc // up to 2^16-1 items, followed by size in 2 bytes | ||
| msgpackArray32 byte = 0xdd // up to 2^32-1 items, followed by size in 4 bytes | ||
| ) | ||
|
|
||
| // updateHeader updates the payload header based on the number of items currently | ||
| // present in the stream. | ||
| func (p *unsafePayload) updateHeader() { | ||
| n := uint64(atomic.LoadUint32(&p.count)) | ||
| switch { | ||
| case n <= 15: | ||
| p.header[7] = msgpackArrayFix + byte(n) | ||
| p.off = 7 | ||
| case n <= 1<<16-1: | ||
| binary.BigEndian.PutUint64(p.header, n) // writes 2 bytes | ||
| p.header[5] = msgpackArray16 | ||
| p.off = 5 | ||
| default: // n <= 1<<32-1 | ||
| binary.BigEndian.PutUint64(p.header, n) // writes 4 bytes | ||
| p.header[3] = msgpackArray32 | ||
| p.off = 3 | ||
| } | ||
| } | ||
|
|
||
| // Close implements io.Closer | ||
| func (p *unsafePayload) Close() error { | ||
| return nil | ||
| } | ||
|
|
||
| // Read implements io.Reader. It reads from the msgpack-encoded stream. | ||
| func (p *unsafePayload) Read(b []byte) (n int, err error) { | ||
| if p.off < len(p.header) { | ||
| // reading header | ||
| n = copy(b, p.header[p.off:]) | ||
| p.off += n | ||
| return n, nil | ||
| } | ||
| if p.reader == nil { | ||
| p.reader = bytes.NewReader(p.buf.Bytes()) | ||
| } | ||
| return p.reader.Read(b) | ||
| } | ||
|
|
||
| // Write implements io.Writer. It writes data directly to the buffer. | ||
| func (p *unsafePayload) Write(data []byte) (n int, err error) { | ||
| return p.buf.Write(data) | ||
| } | ||
|
|
||
| // grow grows the buffer to ensure it can accommodate n more bytes. | ||
| func (p *unsafePayload) grow(n int) { | ||
| p.buf.Grow(n) | ||
| } | ||
|
|
||
| // recordItem records that an item was added and updates the header. | ||
| func (p *unsafePayload) recordItem() { | ||
| atomic.AddUint32(&p.count, 1) | ||
| p.updateHeader() | ||
| } | ||
|
|
||
| // stats returns the current stats of the payload. | ||
| func (p *unsafePayload) stats() payloadStats { | ||
| return payloadStats{ | ||
| size: p.size(), | ||
| itemCount: int(atomic.LoadUint32(&p.count)), | ||
| } | ||
| } | ||
|
|
||
| // protocol returns the protocol version of the payload. | ||
| func (p *unsafePayload) protocol() float64 { | ||
| return p.protocolVersion | ||
| } | ||
|
|
||
| var _ io.Reader = (*safePayload)(nil) | ||
|
|
||
| // newPayload returns a ready to use thread-safe payload. | ||
| func newPayload(protocol float64) payload { | ||
| return &safePayload{ | ||
| p: newUnsafePayload(protocol), | ||
| } | ||
| } | ||
|
|
||
| // safePayload provides a thread-safe wrapper around unsafePayload. | ||
| // safePayload provides a thread-safe wrapper around payload. | ||
| type safePayload struct { | ||
| mu sync.RWMutex | ||
| p *unsafePayload | ||
| p payload | ||
|
Comment on lines
-251
to
+77
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm slightly confused by
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mtoffl01 |
||
| } | ||
|
|
||
| // push pushes a new item into the stream in a thread-safe manner. | ||
|
|
@@ -246,8 +80,7 @@ | |
|
|
||
| // itemCount returns the number of items available in the stream in a thread-safe manner. | ||
| func (sp *safePayload) itemCount() int { | ||
| // Use direct atomic access for better performance - no mutex needed | ||
| return int(atomic.LoadUint32(&sp.p.count)) | ||
| return sp.p.itemCount() | ||
hannahkm marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| // size returns the payload size in bytes in a thread-safe manner. | ||
|
|
@@ -319,3 +152,29 @@ | |
| // Protocol is immutable after creation - no lock needed | ||
| return sp.p.protocol() | ||
| } | ||
|
|
||
| // traceChunk represents a list of spans with the same trace ID, | ||
| // i.e. a chunk of a trace | ||
| type traceChunk struct { | ||
| // the sampling priority of the trace | ||
| priority int32 `msg:"priority"` | ||
|
|
||
| // the optional string origin ("lambda", "rum", etc.) of the trace chunk | ||
| origin string `msg:"origin,omitempty"` | ||
|
|
||
| // a collection of key to value pairs common in all `spans` | ||
| attributes map[uint32]anyValue `msg:"attributes,omitempty"` | ||
|
|
||
| // a list of spans in this chunk | ||
| spans spanListV1 `msg:"spans,omitempty"` | ||
|
|
||
| // whether the trace only contains analyzed spans | ||
| // (not required by tracers and set by the agent) | ||
| droppedTrace bool `msg:"droppedTrace"` | ||
|
|
||
| // the ID of the trace to which all spans in this chunk belong | ||
| traceID [16]byte `msg:"traceID"` | ||
|
|
||
| // the optional string decision maker (previously span tag _dd.p.dm) | ||
| samplingMechanism string `msg:"samplingMechanism,omitempty"` | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.