Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion ddtrace/tracer/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,23 @@ func (t *httpTransport) sendStats(p *pb.ClientStatsPayload, tracerObfuscationVer
if err := msgp.Encode(&buf, p); err != nil {
return err
}
req, err := http.NewRequest("POST", t.statsURL, &buf)
body := buf.Bytes()

req, err := http.NewRequest("POST", t.statsURL, bytes.NewReader(body))
if err != nil {
return err
}

// by providing GetBody and a zero length slice for the Idempotency-Key
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like it could work well here, but wouldn't this mean all these requests are considered idempotent so even on a network error where the agent received and processed the request this would cause a retry of the same request? I took a look at the net/http docs for clarity here but they aren't clarifying this exact case for me

// http header, we get free retries in the face of network errors, such as
// when datadog-agent is much more aggressive about closing idle connections
// than we are
req.GetBody = func() (io.ReadCloser, error) {
return io.NopCloser(bytes.NewReader(body)), nil
}
req.ContentLength = int64(len(body))
req.Header["Idempotency-Key"] = []string{}

for header, value := range t.headers {
req.Header.Set(header, value)
}
Expand Down
106 changes: 106 additions & 0 deletions ddtrace/tracer/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package tracer

import (
"context"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -509,3 +510,108 @@ func TestDefaultHeaders(t *testing.T) {
err = trc.config.transport.sendStats(&pb.ClientStatsPayload{}, 1)
assert.NoError(err)
}

// eofConn has tightly coupled semantics to how http.Transport uses a connection
// namely, it permits returning an io.EOF from Read, *after* Write has returned.
type eofConn struct {
rcalls int
wcalls int
ch chan bool

// included to satisfy net.Conn but not all methods are implemented
net.Conn
}

func NewEOFConn(conn net.Conn, err error) (*eofConn, error) {
return &eofConn{
0, 0, make(chan bool, 0), conn,
}, err
}

func (c *eofConn) Read(p []byte) (int, error) {
c.rcalls++
if passthrough := <-c.ch; passthrough {
return c.Conn.Read(p)
}
return 0, io.EOF
}

func (c *eofConn) Write(p []byte) (int, error) {
c.wcalls++
n, err := c.Conn.Write(p)
if c.wcalls == 2 {
c.ch <- false
} else {
c.ch <- true
}
return n, err
}

// TestSendStatsNetworkErrorRetry ensures that httpTransport.sendStats will
// retry in the face of an awful edge case race condition. datadog-agent has an
// effective idle timeout of 5 seconds, and the tracer's default is 90 seconds.
// The agent can close the `net.Conn` `httpTransport` is using _while_ the
// request is being sent, resulting in a `(net.Conn).Read` getting `io.EOF`.
// `http.Transport` will retry idempotent requests when that network error
// occurs, but `sendStats` was not considered idempotent until this
// commit/test/PR.
func TestSendStatsNetworkErrorRetry(t *testing.T) {
// disable instrumentation telemetry to prevent flaky number of requests
t.Setenv("DD_INSTRUMENTATION_TELEMETRY_ENABLED", "false")
t.Setenv("DD_TRACE_STARTUP_LOGS", "0")
assert := assert.New(t)

var handlerHits int
srv := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
handlerHits++
if r.Method == http.MethodGet {
return
}
// make sure we get the Content-Length header on the PUT
cl := r.Header.Get("Content-Length")
assert.NotZero(cl)
rw.WriteHeader(http.StatusOK)
_, _ = rw.Write([]byte("ok"))
}))
defer srv.Close()

u, err := url.Parse(srv.URL)
assert.NoError(err)
c := &http.Client{}
rt := wrapRecordingRoundTripper(c)

var d net.Dialer
var conns []*eofConn
var transportDials int

rt.rt = &http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
transportDials++
con, err := NewEOFConn(d.DialContext(ctx, network, addr))
conns = append(conns, con)
return con, err
},
}

trc, err := newTracer(WithAgentTimeout(2), WithAgentAddr(u.Host), WithHTTPClient(c))
defer trc.Stop()
assert.NoError(err)

err = trc.config.transport.sendStats(&pb.ClientStatsPayload{}, 1)
assert.NoError(err)
assert.Len(rt.reqs, 2)

// we expect to use two connections
assert.Len(conns, 2)
// first connection gets used twice, the second time's failures result in
// the second connection being established
assert.Equal(2, conns[0].rcalls)
assert.Equal(2, conns[0].wcalls)
assert.Equal(1, conns[1].rcalls)
assert.Equal(1, conns[1].wcalls)

assert.Contains(rt.reqs[0].URL.Path, "/info")
assert.Contains(rt.reqs[1].URL.Path, "/stats")
assert.Equal(handlerHits, 3) // we expect more hits than requests due to idempotency retries
assert.Equal(transportDials, 2)
}
Loading