diff --git a/ddtrace/tracer/transport.go b/ddtrace/tracer/transport.go index 1046392dc9..8b8e237e55 100644 --- a/ddtrace/tracer/transport.go +++ b/ddtrace/tracer/transport.go @@ -127,10 +127,23 @@ func (t *httpTransport) sendStats(p *pb.ClientStatsPayload, tracerObfuscationVer if err := msgp.Encode(&buf, p); err != nil { return err } - req, err := http.NewRequest("POST", t.statsURL, &buf) + body := buf.Bytes() + + req, err := http.NewRequest("POST", t.statsURL, bytes.NewReader(body)) if err != nil { return err } + + // by providing GetBody and a zero length slice for the Idempotency-Key + // http header, we get free retries in the face of network errors, such as + // when datadog-agent is much more aggressive about closing idle connections + // than we are + req.GetBody = func() (io.ReadCloser, error) { + return io.NopCloser(bytes.NewReader(body)), nil + } + req.ContentLength = int64(len(body)) + req.Header["Idempotency-Key"] = []string{} + for header, value := range t.headers { req.Header.Set(header, value) } diff --git a/ddtrace/tracer/transport_test.go b/ddtrace/tracer/transport_test.go index 0bf5b67e9b..4a16041d3e 100644 --- a/ddtrace/tracer/transport_test.go +++ b/ddtrace/tracer/transport_test.go @@ -6,6 +6,7 @@ package tracer import ( + "context" "fmt" "io" "net" @@ -509,3 +510,108 @@ func TestDefaultHeaders(t *testing.T) { err = trc.config.transport.sendStats(&pb.ClientStatsPayload{}, 1) assert.NoError(err) } + +// eofConn has tightly coupled semantics to how http.Transport uses a connection +// namely, it permits returning an io.EOF from Read, *after* Write has returned. +type eofConn struct { + rcalls int + wcalls int + ch chan bool + + // included to satisfy net.Conn but not all methods are implemented + net.Conn +} + +func NewEOFConn(conn net.Conn, err error) (*eofConn, error) { + return &eofConn{ + 0, 0, make(chan bool, 0), conn, + }, err +} + +func (c *eofConn) Read(p []byte) (int, error) { + c.rcalls++ + if passthrough := <-c.ch; passthrough { + return c.Conn.Read(p) + } + return 0, io.EOF +} + +func (c *eofConn) Write(p []byte) (int, error) { + c.wcalls++ + n, err := c.Conn.Write(p) + if c.wcalls == 2 { + c.ch <- false + } else { + c.ch <- true + } + return n, err +} + +// TestSendStatsNetworkErrorRetry ensures that httpTransport.sendStats will +// retry in the face of an awful edge case race condition. datadog-agent has an +// effective idle timeout of 5 seconds, and the tracer's default is 90 seconds. +// The agent can close the `net.Conn` `httpTransport` is using _while_ the +// request is being sent, resulting in a `(net.Conn).Read` getting `io.EOF`. +// `http.Transport` will retry idempotent requests when that network error +// occurs, but `sendStats` was not considered idempotent until this +// commit/test/PR. +func TestSendStatsNetworkErrorRetry(t *testing.T) { + // disable instrumentation telemetry to prevent flaky number of requests + t.Setenv("DD_INSTRUMENTATION_TELEMETRY_ENABLED", "false") + t.Setenv("DD_TRACE_STARTUP_LOGS", "0") + assert := assert.New(t) + + var handlerHits int + srv := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + handlerHits++ + if r.Method == http.MethodGet { + return + } + // make sure we get the Content-Length header on the PUT + cl := r.Header.Get("Content-Length") + assert.NotZero(cl) + rw.WriteHeader(http.StatusOK) + _, _ = rw.Write([]byte("ok")) + })) + defer srv.Close() + + u, err := url.Parse(srv.URL) + assert.NoError(err) + c := &http.Client{} + rt := wrapRecordingRoundTripper(c) + + var d net.Dialer + var conns []*eofConn + var transportDials int + + rt.rt = &http.Transport{ + DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { + transportDials++ + con, err := NewEOFConn(d.DialContext(ctx, network, addr)) + conns = append(conns, con) + return con, err + }, + } + + trc, err := newTracer(WithAgentTimeout(2), WithAgentAddr(u.Host), WithHTTPClient(c)) + defer trc.Stop() + assert.NoError(err) + + err = trc.config.transport.sendStats(&pb.ClientStatsPayload{}, 1) + assert.NoError(err) + assert.Len(rt.reqs, 2) + + // we expect to use two connections + assert.Len(conns, 2) + // first connection gets used twice, the second time's failures result in + // the second connection being established + assert.Equal(2, conns[0].rcalls) + assert.Equal(2, conns[0].wcalls) + assert.Equal(1, conns[1].rcalls) + assert.Equal(1, conns[1].wcalls) + + assert.Contains(rt.reqs[0].URL.Path, "/info") + assert.Contains(rt.reqs[1].URL.Path, "/stats") + assert.Equal(handlerHits, 3) // we expect more hits than requests due to idempotency retries + assert.Equal(transportDials, 2) +}