From eefe08eafeae6225d2970b7e06acf069757dfd22 Mon Sep 17 00:00:00 2001 From: Mikayla Toffler Date: Tue, 9 Dec 2025 15:33:04 -0500 Subject: [PATCH 1/2] migrate sendRetries, and fix flaky writer test --- ddtrace/tracer/civisibility_writer.go | 6 +++--- ddtrace/tracer/civisibility_writer_test.go | 3 ++- ddtrace/tracer/option.go | 6 +----- ddtrace/tracer/option_test.go | 4 ++-- ddtrace/tracer/telemetry.go | 2 +- ddtrace/tracer/writer.go | 5 +++-- ddtrace/tracer/writer_test.go | 9 +++++++-- internal/config/config.go | 17 +++++++++++++++++ internal/env/supported_configurations.gen.go | 1 + internal/env/supported_configurations.json | 3 +++ 10 files changed, 40 insertions(+), 16 deletions(-) diff --git a/ddtrace/tracer/civisibility_writer.go b/ddtrace/tracer/civisibility_writer.go index 038e8e6089..0af5d95032 100644 --- a/ddtrace/tracer/civisibility_writer.go +++ b/ddtrace/tracer/civisibility_writer.go @@ -112,8 +112,8 @@ func (w *ciVisibilityTraceWriter) flush() { requestCompressedType = telemetry.CompressedRequestCompressedType } telemetry.EndpointPayloadRequests(telemetry.TestCycleEndpointType, requestCompressedType) - - for attempt := 0; attempt <= w.config.sendRetries; attempt++ { + sendRetries := w.config.internalConfig.SendRetries() + for attempt := 0; attempt <= sendRetries; attempt++ { stats := p.stats() size, count = stats.size, stats.itemCount log.Debug("ciVisibilityTraceWriter: sending payload: size: %d events: %d\n", size, count) @@ -122,7 +122,7 @@ func (w *ciVisibilityTraceWriter) flush() { log.Debug("ciVisibilityTraceWriter: sent events after %d attempts", attempt+1) return } - log.Error("ciVisibilityTraceWriter: failure sending events (attempt %d of %d): %v", attempt+1, w.config.sendRetries+1, err.Error()) + log.Error("ciVisibilityTraceWriter: failure sending events (attempt %d of %d): %v", attempt+1, sendRetries+1, err.Error()) p.reset() time.Sleep(w.config.retryInterval) } diff --git a/ddtrace/tracer/civisibility_writer_test.go b/ddtrace/tracer/civisibility_writer_test.go index 113f6bd5ca..0eed7cf0a9 100644 --- a/ddtrace/tracer/civisibility_writer_test.go +++ b/ddtrace/tracer/civisibility_writer_test.go @@ -13,6 +13,7 @@ import ( "testing" "time" + internalconfig "github.com/DataDog/dd-trace-go/v2/internal/config" "github.com/stretchr/testify/assert" "github.com/tinylib/msgp/msgp" ) @@ -90,7 +91,7 @@ func TestCiVisibilityTraceWriterFlushRetries(t *testing.T) { } c, err := newTestConfig(func(c *config) { c.transport = p - c.sendRetries = test.configRetries + c.internalConfig.SetSendRetries(test.configRetries, internalconfig.OriginCode) c.retryInterval = test.retryInterval }) assert.NoError(err) diff --git a/ddtrace/tracer/option.go b/ddtrace/tracer/option.go index 0ce39ee3a7..6d51e78d83 100644 --- a/ddtrace/tracer/option.go +++ b/ddtrace/tracer/option.go @@ -162,10 +162,6 @@ type config struct { // output instead of using the agent. This is used in Lambda environments. logToStdout bool - // sendRetries is the number of times a trace or CI Visibility payload send is retried upon - // failure. - sendRetries int - // retryInterval is the interval between agent connection retries. It has no effect if sendRetries is not set retryInterval time.Duration @@ -1026,7 +1022,7 @@ func WithLambdaMode(enabled bool) StartOption { // most `retries` times. func WithSendRetries(retries int) StartOption { return func(c *config) { - c.sendRetries = retries + c.internalConfig.SetSendRetries(retries, telemetry.OriginCode) } } diff --git a/ddtrace/tracer/option_test.go b/ddtrace/tracer/option_test.go index b08360eca4..62cb5366ed 100644 --- a/ddtrace/tracer/option_test.go +++ b/ddtrace/tracer/option_test.go @@ -920,7 +920,7 @@ func TestTracerOptionsDefaults(t *testing.T) { t.Run("trace-retries", func(t *testing.T) { c, err := newTestConfig() assert.NoError(t, err) - assert.Equal(t, 0, c.sendRetries) + assert.Equal(t, 0, c.internalConfig.SendRetries()) assert.Equal(t, time.Millisecond, c.retryInterval) }) } @@ -929,7 +929,7 @@ func TestTraceRetry(t *testing.T) { t.Run("sendRetries", func(t *testing.T) { c, err := newTestConfig(WithSendRetries(10)) assert.NoError(t, err) - assert.Equal(t, 10, c.sendRetries) + assert.Equal(t, 10, c.internalConfig.SendRetries()) }) t.Run("retryInterval", func(t *testing.T) { c, err := newTestConfig(WithRetryInterval(10)) diff --git a/ddtrace/tracer/telemetry.go b/ddtrace/tracer/telemetry.go index 4704efd773..9136a2151b 100644 --- a/ddtrace/tracer/telemetry.go +++ b/ddtrace/tracer/telemetry.go @@ -40,7 +40,7 @@ func startTelemetry(c *config) telemetry.Client { {Name: "stats_computation_enabled", Value: c.canComputeStats()}, {Name: "dogstatsd_port", Value: c.agent.StatsdPort}, {Name: "lambda_mode", Value: c.logToStdout}, - {Name: "send_retries", Value: c.sendRetries}, + {Name: "send_retries", Value: c.internalConfig.SendRetries()}, {Name: "retry_interval", Value: c.retryInterval}, {Name: "trace_startup_logs_enabled", Value: c.logStartup}, {Name: "service", Value: c.serviceName}, diff --git a/ddtrace/tracer/writer.go b/ddtrace/tracer/writer.go index ea5c1c55b6..962073a47d 100644 --- a/ddtrace/tracer/writer.go +++ b/ddtrace/tracer/writer.go @@ -131,7 +131,8 @@ func (h *agentTraceWriter) flush() { stats := p.stats() var err error - for attempt := 0; attempt <= h.config.sendRetries; attempt++ { + sendRetries := h.config.internalConfig.SendRetries() + for attempt := 0; attempt <= sendRetries; attempt++ { log.Debug("Attempt to send payload: size: %d traces: %d\n", stats.size, stats.itemCount) var rc io.ReadCloser rc, err = h.config.transport.send(p) @@ -146,7 +147,7 @@ func (h *agentTraceWriter) flush() { } if attempt+1%5 == 0 { - log.Error("failure sending traces (attempt %d of %d): %v", attempt+1, h.config.sendRetries+1, err.Error()) + log.Error("failure sending traces (attempt %d of %d): %v", attempt+1, sendRetries+1, err.Error()) } p.reset() time.Sleep(h.config.retryInterval) diff --git a/ddtrace/tracer/writer_test.go b/ddtrace/tracer/writer_test.go index ec6a5fe5a8..6277930d09 100644 --- a/ddtrace/tracer/writer_test.go +++ b/ddtrace/tracer/writer_test.go @@ -20,7 +20,9 @@ import ( "testing" "time" + internalconfig "github.com/DataDog/dd-trace-go/v2/internal/config" "github.com/DataDog/dd-trace-go/v2/internal/log" + "github.com/DataDog/dd-trace-go/v2/internal/processtags" "github.com/DataDog/dd-trace-go/v2/internal/statsdtest" "github.com/stretchr/testify/assert" @@ -372,6 +374,9 @@ func (t *failingTransport) send(p payload) (io.ReadCloser, error) { } func TestTraceWriterFlushRetries(t *testing.T) { + // Reload process tags to ensure consistent state (previous tests may have disabled them) + processtags.Reload() + testcases := []struct { configRetries int retryInterval time.Duration @@ -397,7 +402,7 @@ func TestTraceWriterFlushRetries(t *testing.T) { sentCounts := map[string]int64{ "datadog.tracer.decode_error": 1, - "datadog.tracer.flush_bytes": 185, + "datadog.tracer.flush_bytes": 308, "datadog.tracer.flush_traces": 1, "datadog.tracer.queue.enqueued.traces": 1, } @@ -417,7 +422,7 @@ func TestTraceWriterFlushRetries(t *testing.T) { } c, err := newTestConfig(func(c *config) { c.transport = p - c.sendRetries = test.configRetries + c.internalConfig.SetSendRetries(test.configRetries, internalconfig.OriginCode) c.retryInterval = test.retryInterval }) assert.Nil(err) diff --git a/internal/config/config.go b/internal/config/config.go index 143e1adf78..bd477770a2 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -63,6 +63,9 @@ type Config struct { ciVisibilityAgentless bool logDirectory string traceRateLimitPerSecond float64 + // sendRetries is the number of times a trace or CI Visibility payload send is retried upon + // failure. + sendRetries int } // loadConfig initializes and returns a new config by reading from all configured sources. @@ -98,6 +101,7 @@ func loadConfig() *Config { cfg.ciVisibilityAgentless = provider.getBool("DD_CIVISIBILITY_AGENTLESS_ENABLED", false) cfg.logDirectory = provider.getString("DD_TRACE_LOG_DIRECTORY", "") cfg.traceRateLimitPerSecond = provider.getFloat("DD_TRACE_RATE_LIMIT", 0.0) + cfg.sendRetries = provider.getInt("DD_TRACE_SEND_RETRIES", 0) return cfg } @@ -134,3 +138,16 @@ func (c *Config) SetDebug(enabled bool, origin telemetry.Origin) { c.debug = enabled telemetry.RegisterAppConfig("DD_TRACE_DEBUG", enabled, origin) } + +func (c *Config) SendRetries() int { + c.mu.RLock() + defer c.mu.RUnlock() + return c.sendRetries +} + +func (c *Config) SetSendRetries(retries int, origin telemetry.Origin) { + c.mu.Lock() + defer c.mu.Unlock() + c.sendRetries = retries + telemetry.RegisterAppConfig("DD_TRACE_SEND_RETRIES", retries, origin) +} diff --git a/internal/env/supported_configurations.gen.go b/internal/env/supported_configurations.gen.go index 36ba72fae5..3e373e8290 100644 --- a/internal/env/supported_configurations.gen.go +++ b/internal/env/supported_configurations.gen.go @@ -208,6 +208,7 @@ var SupportedConfigurations = map[string]struct{}{ "DD_TRACE_SAMPLING_RULES": {}, "DD_TRACE_SAMPLING_RULES_FILE": {}, "DD_TRACE_SARAMA_ANALYTICS_ENABLED": {}, + "DD_TRACE_SEND_RETRIES": {}, "DD_TRACE_SOURCE_HOSTNAME": {}, "DD_TRACE_SPAN_ATTRIBUTE_SCHEMA": {}, "DD_TRACE_SQL_ANALYTICS_ENABLED": {}, diff --git a/internal/env/supported_configurations.json b/internal/env/supported_configurations.json index ad2da85ed0..30f22f4eb2 100644 --- a/internal/env/supported_configurations.json +++ b/internal/env/supported_configurations.json @@ -597,6 +597,9 @@ "DD_TRACE_SARAMA_ANALYTICS_ENABLED": [ "A" ], + "DD_TRACE_SEND_RETRIES": [ + "A" + ], "DD_TRACE_SOURCE_HOSTNAME": [ "A" ], From 30bec03306c527b985280f60e5f8cfa89a05e9b6 Mon Sep 17 00:00:00 2001 From: Mikayla Toffler Date: Wed, 10 Dec 2025 11:21:30 -0500 Subject: [PATCH 2/2] Make flush_bytes value dynamic based on payload size --- ddtrace/tracer/writer_test.go | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/ddtrace/tracer/writer_test.go b/ddtrace/tracer/writer_test.go index 6277930d09..b9c186987f 100644 --- a/ddtrace/tracer/writer_test.go +++ b/ddtrace/tracer/writer_test.go @@ -400,18 +400,11 @@ func TestTraceWriterFlushRetries(t *testing.T) { {configRetries: 2, retryInterval: 2 * time.Millisecond, failCount: 2, tracesSent: true, expAttempts: 3}, } - sentCounts := map[string]int64{ - "datadog.tracer.decode_error": 1, - "datadog.tracer.flush_bytes": 308, - "datadog.tracer.flush_traces": 1, - "datadog.tracer.queue.enqueued.traces": 1, - } - droppedCounts := map[string]int64{ - "datadog.tracer.queue.enqueued.traces": 1, - "datadog.tracer.traces_dropped": 1, - } - ss := []*Span{makeSpan(0)} + + // Capture expected payload size from first test run to be OS-agnostic and forward-compatible + var expectedPayloadSize int64 + for _, test := range testcases { name := fmt.Sprintf("%d-%d-%t-%d", test.configRetries, test.failCount, test.tracesSent, test.expAttempts) t.Run(name, func(t *testing.T) { @@ -430,6 +423,12 @@ func TestTraceWriterFlushRetries(t *testing.T) { h := newAgentTraceWriter(c, newPrioritySampler(), &statsd) h.add(ss) + + // Capture payload size from first test run for dynamic assertions + if expectedPayloadSize == 0 { + expectedPayloadSize = int64(h.payload.size()) + } + start := time.Now() h.flush() h.wg.Wait() @@ -440,8 +439,18 @@ func TestTraceWriterFlushRetries(t *testing.T) { assert.Equal(1, len(statsd.TimingCalls())) if test.tracesSent { + sentCounts := map[string]int64{ + "datadog.tracer.decode_error": 1, + "datadog.tracer.flush_bytes": expectedPayloadSize, + "datadog.tracer.flush_traces": 1, + "datadog.tracer.queue.enqueued.traces": 1, + } assert.Equal(sentCounts, statsd.Counts()) } else { + droppedCounts := map[string]int64{ + "datadog.tracer.queue.enqueued.traces": 1, + "datadog.tracer.traces_dropped": 1, + } assert.Equal(droppedCounts, statsd.Counts()) } if test.configRetries > 0 && test.failCount > 1 {