Skip to content

Commit f94e51b

Browse files
committed
Rework the StatsDManager
Make sure we can't dispose a stats consumer that's in use (as it will throw) Rework to use a "lease" mechanism to track usages Make passing in a statsmanager required
1 parent 511220d commit f94e51b

31 files changed

+580
-259
lines changed

tracer/src/Datadog.Trace/Agent/AgentWriter.cs

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ internal class AgentWriter : IAgentWriter
2828
private static readonly ArraySegment<byte> EmptyPayload = new(new byte[] { 0x90 });
2929

3030
private readonly ConcurrentQueue<WorkItem> _pendingTraces = new ConcurrentQueue<WorkItem>();
31-
private readonly IDogStatsd _statsd;
31+
private readonly IStatsdManager _statsd;
3232
private readonly Task _flushTask;
3333
private readonly Task _serializationTask;
3434
private readonly TaskCompletionSource<bool> _processExit = new TaskCompletionSource<bool>();
@@ -67,7 +67,7 @@ internal class AgentWriter : IAgentWriter
6767

6868
private bool _traceMetricsEnabled;
6969

70-
public AgentWriter(IApi api, IStatsAggregator statsAggregator, IDogStatsd statsd, TracerSettings settings)
70+
public AgentWriter(IApi api, IStatsAggregator statsAggregator, IStatsdManager statsd, TracerSettings settings)
7171
: this(api, statsAggregator, statsd, maxBufferSize: settings.TraceBufferSize, batchInterval: settings.TraceBatchInterval, apmTracingEnabled: settings.ApmTracingEnabled, initialTracerMetricsEnabled: settings.Manager.InitialMutableSettings.TracerMetricsEnabled)
7272
{
7373
settings.Manager.SubscribeToChanges(changes =>
@@ -76,16 +76,17 @@ public AgentWriter(IApi api, IStatsAggregator statsAggregator, IDogStatsd statsd
7676
&& mutable.TracerMetricsEnabled != changes.PreviousMutable.TracerMetricsEnabled)
7777
{
7878
Volatile.Write(ref _traceMetricsEnabled, mutable.TracerMetricsEnabled);
79+
_statsd.SetRequired(StatsdConsumer.AgentWriter, mutable.TracerMetricsEnabled);
7980
}
8081
});
8182
}
8283

83-
public AgentWriter(IApi api, IStatsAggregator statsAggregator, IDogStatsd statsd, bool automaticFlush = true, int maxBufferSize = 1024 * 1024 * 10, int batchInterval = 100, bool apmTracingEnabled = true, bool initialTracerMetricsEnabled = false)
84+
public AgentWriter(IApi api, IStatsAggregator statsAggregator, IStatsdManager statsd, bool automaticFlush = true, int maxBufferSize = 1024 * 1024 * 10, int batchInterval = 100, bool apmTracingEnabled = true, bool initialTracerMetricsEnabled = false)
8485
: this(api, statsAggregator, statsd, MovingAverageKeepRateCalculator.CreateDefaultKeepRateCalculator(), automaticFlush, maxBufferSize, batchInterval, apmTracingEnabled, initialTracerMetricsEnabled)
8586
{
8687
}
8788

88-
internal AgentWriter(IApi api, IStatsAggregator statsAggregator, IDogStatsd statsd, IKeepRateCalculator traceKeepRateCalculator, bool automaticFlush, int maxBufferSize, int batchInterval, bool apmTracingEnabled, bool initialTracerMetricsEnabled)
89+
internal AgentWriter(IApi api, IStatsAggregator statsAggregator, IStatsdManager statsd, IKeepRateCalculator traceKeepRateCalculator, bool automaticFlush, int maxBufferSize, int batchInterval, bool apmTracingEnabled, bool initialTracerMetricsEnabled)
8990
{
9091
_statsAggregator = statsAggregator;
9192

@@ -104,6 +105,7 @@ internal AgentWriter(IApi api, IStatsAggregator statsAggregator, IDogStatsd stat
104105

105106
_apmTracingEnabled = apmTracingEnabled;
106107
_traceMetricsEnabled = initialTracerMetricsEnabled;
108+
_statsd.SetRequired(StatsdConsumer.AgentWriter, initialTracerMetricsEnabled);
107109

108110
_serializationTask = automaticFlush ? Task.Factory.StartNew(SerializeTracesLoop, TaskCreationOptions.LongRunning) : Task.CompletedTask;
109111
_serializationTask.ContinueWith(t => Log.Error(t.Exception, "Error in serialization task"), TaskContinuationOptions.OnlyOnFaulted);
@@ -151,8 +153,12 @@ public void WriteTrace(ArraySegment<Span> trace)
151153

152154
if (Volatile.Read(ref _traceMetricsEnabled))
153155
{
154-
_statsd.Increment(TracerMetricNames.Queue.EnqueuedTraces);
155-
_statsd.Increment(TracerMetricNames.Queue.EnqueuedSpans, trace.Count);
156+
using var lease = _statsd.TryGetClientLease();
157+
if (lease.Client is { } statsd)
158+
{
159+
statsd.Increment(TracerMetricNames.Queue.EnqueuedTraces);
160+
statsd.Increment(TracerMetricNames.Queue.EnqueuedSpans, trace.Count);
161+
}
156162
}
157163
}
158164

@@ -327,8 +333,12 @@ async Task InternalBufferFlush()
327333
{
328334
if (Volatile.Read(ref _traceMetricsEnabled))
329335
{
330-
_statsd.Increment(TracerMetricNames.Queue.DequeuedTraces, buffer.TraceCount);
331-
_statsd.Increment(TracerMetricNames.Queue.DequeuedSpans, buffer.SpanCount);
336+
using var lease = _statsd.TryGetClientLease();
337+
if (lease.Client is { } statsd)
338+
{
339+
statsd.Increment(TracerMetricNames.Queue.DequeuedTraces, buffer.TraceCount);
340+
statsd.Increment(TracerMetricNames.Queue.DequeuedSpans, buffer.SpanCount);
341+
}
332342
}
333343

334344
var droppedTraces = Interlocked.Exchange(ref _droppedTraces, 0);
@@ -525,8 +535,12 @@ private void DropTrace(ArraySegment<Span> spans)
525535

526536
if (Volatile.Read(ref _traceMetricsEnabled))
527537
{
528-
_statsd.Increment(TracerMetricNames.Queue.DroppedTraces);
529-
_statsd.Increment(TracerMetricNames.Queue.DroppedSpans, spans.Count);
538+
using var lease = _statsd.TryGetClientLease();
539+
if (lease.Client is { } statsd)
540+
{
541+
statsd.Increment(TracerMetricNames.Queue.DroppedTraces);
542+
statsd.Increment(TracerMetricNames.Queue.DroppedSpans, spans.Count);
543+
}
530544
}
531545
}
532546

tracer/src/Datadog.Trace/Agent/Api.cs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ internal class Api : IApi
3333

3434
private readonly IDatadogLogger _log;
3535
private readonly IApiRequestFactory _apiRequestFactory;
36-
private readonly IDogStatsd _originalStatsd;
36+
private readonly IStatsdManager _statsd;
3737
private readonly string _containerId;
3838
private readonly string _entityId;
3939
private readonly Uri _tracesEndpoint;
@@ -42,13 +42,13 @@ internal class Api : IApi
4242
private readonly bool _partialFlushEnabled;
4343
private readonly SendCallback<SendStatsState> _sendStats;
4444
private readonly SendCallback<SendTracesState> _sendTraces;
45-
private IDogStatsd _statsd;
4645
private string _cachedResponse;
4746
private string _agentVersion;
47+
private bool _healthMetricsEnabled;
4848

4949
public Api(
5050
IApiRequestFactory apiRequestFactory,
51-
IDogStatsd statsd,
51+
IStatsdManager statsd,
5252
Action<Dictionary<string, float>> updateSampleRates,
5353
bool partialFlushEnabled,
5454
bool healthMetricsEnabled,
@@ -60,12 +60,13 @@ public Api(
6060
_sendStats = SendStatsAsyncImpl;
6161
_sendTraces = SendTracesAsyncImpl;
6262
_updateSampleRates = updateSampleRates;
63-
_originalStatsd = statsd;
63+
_statsd = statsd;
6464
ToggleTracerHealthMetrics(healthMetricsEnabled);
6565
_containerId = ContainerMetadata.GetContainerId();
6666
_entityId = ContainerMetadata.GetEntityId();
6767
_apiRequestFactory = apiRequestFactory;
6868
_partialFlushEnabled = partialFlushEnabled;
69+
_healthMetricsEnabled = healthMetricsEnabled;
6970
_tracesEndpoint = _apiRequestFactory.GetEndpoint(TracesPath);
7071
_log.Debug("Using traces endpoint {TracesEndpoint}", _tracesEndpoint.ToString());
7172
_statsEndpoint = _apiRequestFactory.GetEndpoint(StatsPath);
@@ -84,7 +85,8 @@ private enum SendResult
8485
[MemberNotNull(nameof(_statsd))]
8586
public void ToggleTracerHealthMetrics(bool enabled)
8687
{
87-
Interlocked.Exchange(ref _statsd, enabled ? _originalStatsd : null);
88+
Volatile.Write(ref _healthMetricsEnabled, enabled);
89+
_statsd.SetRequired(StatsdConsumer.TraceApi, enabled);
8890
}
8991

9092
public Task<bool> SendStatsAsync(StatsBuffer stats, long bucketDuration)
@@ -309,7 +311,8 @@ private async Task<SendResult> SendTracesAsyncImpl(IApiRequest request, bool fin
309311

310312
try
311313
{
312-
var healthStats = Volatile.Read(ref _statsd);
314+
using var lease = Volatile.Read(ref _healthMetricsEnabled) ? _statsd.TryGetClientLease() : default;
315+
var healthStats = _healthMetricsEnabled ? lease.Client : null;
313316
try
314317
{
315318
TelemetryFactory.Metrics.RecordCountTraceApiRequests();
@@ -332,7 +335,7 @@ private async Task<SendResult> SendTracesAsyncImpl(IApiRequest request, bool fin
332335
string[] tags = { $"status:{response.StatusCode}" };
333336

334337
// count every response, grouped by status code
335-
healthStats?.Increment(TracerMetricNames.Api.Responses, tags: tags);
338+
healthStats.Increment(TracerMetricNames.Api.Responses, tags: tags);
336339
}
337340

338341
TelemetryFactory.Metrics.RecordCountTraceApiResponses(response.GetTelemetryStatusCodeMetricTag());

tracer/src/Datadog.Trace/Agent/ManagedApi.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
using System.Threading;
1212
using System.Threading.Tasks;
1313
using Datadog.Trace.Configuration;
14+
using Datadog.Trace.DogStatsd;
1415
using Datadog.Trace.Vendors.StatsdClient;
1516

1617
namespace Datadog.Trace.Agent;
@@ -24,7 +25,7 @@ internal class ManagedApi : IApi
2425

2526
public ManagedApi(
2627
TracerSettings.SettingsManager settings,
27-
IDogStatsd statsd,
28+
IStatsdManager statsd,
2829
Action<Dictionary<string, float>> updateSampleRates,
2930
bool partialFlushEnabled)
3031
{

tracer/src/Datadog.Trace/Ci/TestOptimizationTracerManager.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
using Datadog.Trace.Ci.EventModel;
1414
using Datadog.Trace.Configuration;
1515
using Datadog.Trace.DataStreamsMonitoring;
16+
using Datadog.Trace.DogStatsd;
1617
using Datadog.Trace.Logging;
1718
using Datadog.Trace.Logging.DirectSubmission;
1819
using Datadog.Trace.Logging.TracerFlare;
@@ -32,7 +33,7 @@ public TestOptimizationTracerManager(
3233
TracerSettings settings,
3334
IAgentWriter agentWriter,
3435
IScopeManager scopeManager,
35-
IDogStatsd statsd,
36+
IStatsdManager statsd,
3637
RuntimeMetricsWriter runtimeMetricsWriter,
3738
DirectLogSubmissionManager logSubmissionManager,
3839
ITelemetryController telemetry,
@@ -148,7 +149,7 @@ public LockedManager(
148149
TracerSettings settings,
149150
IAgentWriter agentWriter,
150151
IScopeManager scopeManager,
151-
IDogStatsd statsd,
152+
IStatsdManager statsd,
152153
RuntimeMetricsWriter runtimeMetricsWriter,
153154
DirectLogSubmissionManager logSubmissionManager,
154155
ITelemetryController telemetry,

tracer/src/Datadog.Trace/Ci/TestOptimizationTracerManagerFactory.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
using Datadog.Trace.Ci.Sampling;
1515
using Datadog.Trace.Configuration;
1616
using Datadog.Trace.DataStreamsMonitoring;
17+
using Datadog.Trace.DogStatsd;
1718
using Datadog.Trace.Logging.DirectSubmission;
1819
using Datadog.Trace.Logging.TracerFlare;
1920
using Datadog.Trace.RemoteConfigurationManagement;
@@ -41,7 +42,7 @@ protected override TracerManager CreateTracerManagerFrom(
4142
TracerSettings settings,
4243
IAgentWriter agentWriter,
4344
IScopeManager scopeManager,
44-
IDogStatsd statsd,
45+
IStatsdManager statsd,
4546
RuntimeMetricsWriter runtimeMetrics,
4647
DirectLogSubmissionManager logSubmissionManager,
4748
ITelemetryController telemetry,
@@ -87,7 +88,7 @@ protected override ITraceSampler GetSampler(TracerSettings settings)
8788

8889
protected override bool ShouldEnableRemoteConfiguration(TracerSettings settings) => false;
8990

90-
protected override IAgentWriter GetAgentWriter(TracerSettings settings, IDogStatsd statsd, Action<Dictionary<string, float>> updateSampleRates, IDiscoveryService discoveryService, TelemetrySettings telemetrySettings)
91+
protected override IAgentWriter GetAgentWriter(TracerSettings settings, IStatsdManager statsd, Action<Dictionary<string, float>> updateSampleRates, IDiscoveryService discoveryService, TelemetrySettings telemetrySettings)
9192
{
9293
// Check for agentless scenario
9394
if (_settings.Agentless)
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// <copyright file="IStatsdManager.cs" company="Datadog">
2+
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
4+
// </copyright>
5+
6+
#nullable enable
7+
using System;
8+
using Datadog.Trace.Vendors.StatsdClient;
9+
10+
namespace Datadog.Trace.DogStatsd;
11+
12+
internal interface IStatsdManager : IDisposable
13+
{
14+
/// <summary>
15+
/// Obtain a <see cref="StatsdManager.StatsdClientLease"/> for accessing a <see cref="IDogStatsd"/> instance.
16+
/// The lease must be disposed after all references to the client have gone.
17+
/// </summary>
18+
StatsdManager.StatsdClientLease TryGetClientLease();
19+
20+
/// <summary>
21+
/// Called by users of <see cref="StatsdManager"/> to indicate that a "live" client is required.
22+
/// Each unique consumer of <see cref="StatsdManager"/> should set a different
23+
/// <see cref="StatsdConsumer"/> value.
24+
/// </summary>
25+
void SetRequired(StatsdConsumer consumer, bool enabled);
26+
}

tracer/src/Datadog.Trace/DogStatsd/NoOpStatsd.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
namespace Datadog.Trace.DogStatsd
1010
{
11-
internal class NoOpStatsd : IDogStatsd
11+
internal sealed class NoOpStatsd : IDogStatsd
1212
{
1313
public static readonly NoOpStatsd Instance = new();
1414

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
// <copyright file="StatsdConsumer.cs" company="Datadog">
2+
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
4+
// </copyright>
5+
6+
#nullable enable
7+
8+
using System;
9+
10+
namespace Datadog.Trace.DogStatsd;
11+
12+
[Flags]
13+
internal enum StatsdConsumer
14+
{
15+
// None = 0, Must not use this
16+
// Define bits per consumer:
17+
RuntimeMetricsWriter = 1 << 0,
18+
TraceApi = 1 << 1,
19+
AgentWriter = 1 << 2,
20+
}

0 commit comments

Comments
 (0)