Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Dynamic Instrumentation] Fixed race condition with ProbeStatusPoller dispose #6409

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ internal class ProbeStatusPoller : IProbeStatusPoller
private readonly TimeSpan _longPeriod = TimeSpan.FromMinutes(60);
private readonly HashSet<FetchProbeStatus> _probes = new();
private readonly object _locker = new object();
private Timer _pollerTimer;
private volatile Timer _pollerTimer;
private volatile bool _isDisposed;
private bool _isPolling;
private bool _isRecentlyForcedSchedule;

Expand All @@ -39,6 +40,11 @@ internal static ProbeStatusPoller Create(DiagnosticsSink diagnosticsSink, Debugg

private void PollerCallback(object state)
{
if (_isDisposed)
{
return;
}

if (TryAcquireLock())
{
try
Expand Down Expand Up @@ -82,7 +88,11 @@ private void PausePollerTimer()
{
try
{
_pollerTimer?.Change(Timeout.Infinite, Timeout.Infinite);
var timer = _pollerTimer;
if (timer != null && !_isDisposed)
{
timer.Change(Timeout.Infinite, Timeout.Infinite);
}
}
catch (Exception ex)
{
Expand All @@ -94,9 +104,13 @@ private void ResumePollerTimer()
{
try
{
var waitPeriod = _isRecentlyForcedSchedule ? _shortPeriod : _longPeriod;
_pollerTimer?.Change(waitPeriod, waitPeriod);
_isRecentlyForcedSchedule = false;
var timer = _pollerTimer;
if (timer != null && !_isDisposed)
{
var waitPeriod = _isRecentlyForcedSchedule ? _shortPeriod : _longPeriod;
timer.Change(waitPeriod, waitPeriod);
_isRecentlyForcedSchedule = false;
}
}
catch (Exception ex)
{
Expand All @@ -106,7 +120,7 @@ private void ResumePollerTimer()

private void OnProbeStatusesPoll()
{
if (!_probes.Any())
if (!_probes.Any() || _isDisposed)
{
return;
}
Expand Down Expand Up @@ -142,14 +156,14 @@ private void OnProbeStatusesPoll()

public void StartPolling()
{
if (_isPolling)
if (_isPolling || _isDisposed)
{
return;
}

lock (_locker)
{
if (_isPolling)
if (_isPolling || _isDisposed)
{
return;
}
Expand All @@ -161,6 +175,11 @@ public void StartPolling()

public void AddProbes(FetchProbeStatus[] newProbes)
{
if (_isDisposed)
{
return;
}

lock (_locker)
{
_probes.UnionWith(newProbes);
Expand All @@ -172,16 +191,25 @@ private void ScheduleNextPollInOneSecond()
{
lock (_locker)
{
if (_isPolling)
if (_isPolling && !_isDisposed)
{
_pollerTimer?.Change(TimeSpan.FromSeconds(1), _shortPeriod);
_isRecentlyForcedSchedule = true;
var timer = _pollerTimer;
if (timer != null)
{
timer.Change(TimeSpan.FromSeconds(1), _shortPeriod);
_isRecentlyForcedSchedule = true;
}
}
}
}

public void RemoveProbes(string[] removedProbes)
{
if (_isDisposed)
{
return;
}

lock (_locker)
{
_probes.RemoveWhere(p => removedProbes.Contains(p.ProbeId));
Expand All @@ -195,6 +223,11 @@ public void RemoveProbes(string[] removedProbes)

public void UpdateProbes(string[] probeIds, FetchProbeStatus[] newProbeStatuses)
{
if (_isDisposed)
{
return;
}

lock (_locker)
{
RemoveProbes(probeIds);
Expand All @@ -216,6 +249,11 @@ public void UpdateProbe(string probeId, FetchProbeStatus newProbeStatus)
/// <returns>An array of ProbeIds that have native representation.</returns>
public string[] GetBoundedProbes(string[] candidateProbeIds)
{
if (_isDisposed)
{
return Array.Empty<string>();
}

lock (_locker)
{
return _probes
Expand All @@ -228,7 +266,25 @@ public string[] GetBoundedProbes(string[] candidateProbeIds)

public void Dispose()
{
_pollerTimer?.Dispose();
if (_isDisposed)
{
return;
}

lock (_locker)
{
if (_isDisposed)
{
return;
}

_isDisposed = true;
_isPolling = false;

var timer = _pollerTimer;
_pollerTimer = null;
timer?.Dispose();
}
}
}
}
121 changes: 121 additions & 0 deletions tracer/test/Datadog.Trace.Tests/Debugger/ProbeStatusPollerTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// <copyright file="ProbeStatusPollerTests.cs" company="Datadog">
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
// </copyright>

using System;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.Threading;
using System.Threading.Tasks;
using Datadog.Trace.Configuration;
using Datadog.Trace.Configuration.Telemetry;
using Datadog.Trace.Debugger;
using Datadog.Trace.Debugger.ProbeStatuses;
using Datadog.Trace.Debugger.Sink;
using Datadog.Trace.Debugger.Sink.Models;
using VerifyXunit;
using Xunit;

namespace Datadog.Trace.Tests.Debugger
{
[UsesVerify]
public class ProbeStatusPollerTests
{
private const string ServiceName = "test-service";
private readonly TestDiagnosticsSink _sink;
private readonly ProbeStatusPoller _poller;

public ProbeStatusPollerTests()
{
_sink = new TestDiagnosticsSink(ServiceName);
_poller = CreatePoller(_sink);
}

[Fact]
public async Task WhenDisposed_NoNewProbesAccepted()
{
var probe = CreateProbe("test-probe-1");
_poller.Dispose();
_poller.AddProbes(new[] { probe });
await Task.Delay(2000);
Assert.Empty(_sink.GetDiagnostics());
}

[Fact]
public async Task WhenDisposedDuringPolling_CompletesGracefully()
{
var probe = CreateProbe("test-probe-2");
var disposalCompleted = new TaskCompletionSource<bool>();
_poller.StartPolling();
_poller.AddProbes(new[] { probe });
await Task.Run(() =>
{
Thread.Sleep(100);
_poller.Dispose();
disposalCompleted.SetResult(true);
});

var disposalTask = await Task.WhenAny(disposalCompleted.Task, Task.Delay(5000));
Assert.Same(disposalTask, disposalCompleted.Task);
}

[Fact]
public async Task WhenProbeStatusChanges_ShouldUpdateDiagnostics()
{
var probeId = "test-probe-3";
var probe = CreateProbe(probeId);
_poller.StartPolling();
_poller.AddProbes(new[] { probe });
await Task.Delay(1000);
var diagnostics = _sink.GetDiagnostics();
Assert.Contains(diagnostics, d => d.DebuggerDiagnostics.Diagnostics.ProbeId == probeId);
_sink.AddProbeStatus(probeId, Status.EMITTING, 1);
await Task.Delay(1000);
diagnostics = _sink.GetDiagnostics();
var updatedProbe = diagnostics.Find(d => d.DebuggerDiagnostics.Diagnostics.ProbeId == probeId);
Assert.NotNull(updatedProbe);
Assert.Equal(Status.EMITTING, updatedProbe.DebuggerDiagnostics.Diagnostics.Status);
}

[Fact]
public async Task WhenProbeRemoved_ShouldNotAppearInDiagnostics()
{
var probeId = "test-probe-4";
var probe = CreateProbe(probeId);
_poller.StartPolling();
_poller.AddProbes(new[] { probe });
await Task.Delay(1000);
_poller.RemoveProbes(new[] { probeId });
await Task.Delay(1000);
var diagnostics = _sink.GetDiagnostics();
Assert.DoesNotContain(diagnostics, d => d.DebuggerDiagnostics.Diagnostics.ProbeId == probeId);
}

private static ProbeStatusPoller CreatePoller(DiagnosticsSink sink)
{
var settings = new DebuggerSettings(
new NameValueConfigurationSource(new NameValueCollection()),
NullConfigurationTelemetry.Instance);
return ProbeStatusPoller.Create(sink, settings);
}

private static FetchProbeStatus CreateProbe(string id, int version = 1)
{
return new FetchProbeStatus(id, version);
}

private class TestDiagnosticsSink : DiagnosticsSink
{
public TestDiagnosticsSink(string serviceName)
: base(serviceName, batchSize: 100, interval: TimeSpan.FromSeconds(1))
{
}

public new List<ProbeStatus> GetDiagnostics()
{
return base.GetDiagnostics();
}
}
}
}
Loading