Skip to content
145 changes: 145 additions & 0 deletions src/Daqifi.Core.Tests/Device/DaqifiDeviceTextCommandLockTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
using System;
using System.Net;
using System.Reflection;
using System.Threading.Tasks;
using Daqifi.Core.Device;
using Xunit;

namespace Daqifi.Core.Tests.Device
{
/// <summary>
/// Tests for #186 — ExecuteTextCommandAsync must serialize concurrent
/// callers (SemaphoreSlim), reject same-thread re-entrant calls
/// (InvalidOperationException, not deadlock), and reject calls when
/// the device is disposed or disconnecting.
///
/// The protected method is exercised via a thin subclass that exposes
/// it. Because spinning up a real transport here would be fragile, the
/// pre-lock re-entrancy guard and the disposed/disconnecting guard are
/// tested by setting the relevant private fields via reflection — those
/// guards run before any transport / consumer interaction, so this gives
/// faithful coverage without a transport stack.
/// </summary>
public class DaqifiDeviceTextCommandLockTests
{
[Fact]
public async Task ExecuteTextCommandAsync_WhenSameThreadAlreadyOwnsLock_ThrowsInvalidOperation()
{
var device = new TextCommandTestableDevice("TestDevice");

// Simulate "we're already inside ExecuteTextCommandAsync on this
// thread" by directly setting the owner-thread tracker. The
// re-entrancy guard runs before WaitAsync(), so this check
// fires immediately without touching any transport state.
SetOwnerThreadId(device, Environment.CurrentManagedThreadId);

var ex = await Assert.ThrowsAsync<InvalidOperationException>(
() => device.CallExecuteTextCommandAsync(() => { }));
Assert.Contains("not re-entrant", ex.Message);
}

[Fact]
public async Task ExecuteTextCommandAsync_WhenDisposing_ThrowsInvalidOperation()
{
var device = new TextCommandTestableDevice("TestDevice");
SetIsDisconnecting(device, true);

var ex = await Assert.ThrowsAsync<InvalidOperationException>(
() => device.CallExecuteTextCommandAsync(() => { }));
Assert.Contains("disposing or disconnecting", ex.Message);
}

[Fact]
public async Task ExecuteTextCommandAsync_WhenDisposed_ThrowsInvalidOperation()
{
var device = new TextCommandTestableDevice("TestDevice");
SetDisposed(device, true);

var ex = await Assert.ThrowsAsync<InvalidOperationException>(
() => device.CallExecuteTextCommandAsync(() => { }));
Assert.Contains("disposing or disconnecting", ex.Message);
}

[Fact]
public async Task ExecuteTextCommandAsync_ReleasesLockAfterValidationFailure()
{
// After a validation failure (e.g. not connected), the lock
// must be released so subsequent calls don't hang. Verified
// by calling twice — second call must reach validation too,
// not block on WaitAsync.
var device = new TextCommandTestableDevice("TestDevice");

await Assert.ThrowsAsync<InvalidOperationException>(
() => device.CallExecuteTextCommandAsync(() => { }));
// Second call: also throws, but ONLY if the lock was released.
// If the lock leaked, this would deadlock and pytest's per-test
// budget would time it out instead.
await Assert.ThrowsAsync<InvalidOperationException>(
() => device.CallExecuteTextCommandAsync(() => { }));
}

[Fact]
public async Task ExecuteTextCommandAsync_OwnerThreadIdClearedAfterReturn()
{
// Even when the call throws, the owner-thread tracker is
// cleared in the finally block so a subsequent call from the
// same thread doesn't false-positive the re-entrancy check.
var device = new TextCommandTestableDevice("TestDevice");

await Assert.ThrowsAsync<InvalidOperationException>(
() => device.CallExecuteTextCommandAsync(() => { }));

Assert.Null(GetOwnerThreadId(device));
}

// ── Reflection helpers — kept private to this test class so the
// production DaqifiDevice doesn't have to expose internals. ─────

private static void SetOwnerThreadId(DaqifiDevice device, int? value)
{
typeof(DaqifiDevice)
.GetField("_textExchangeOwnerThreadId", BindingFlags.Instance | BindingFlags.NonPublic)!
.SetValue(device, value);
}

private static int? GetOwnerThreadId(DaqifiDevice device)
{
return (int?)typeof(DaqifiDevice)
.GetField("_textExchangeOwnerThreadId", BindingFlags.Instance | BindingFlags.NonPublic)!
.GetValue(device);
}

private static void SetIsDisconnecting(DaqifiDevice device, bool value)
{
typeof(DaqifiDevice)
.GetField("_isDisconnecting", BindingFlags.Instance | BindingFlags.NonPublic)!
.SetValue(device, value);
}

private static void SetDisposed(DaqifiDevice device, bool value)
{
typeof(DaqifiDevice)
.GetField("_disposed", BindingFlags.Instance | BindingFlags.NonPublic)!
.SetValue(device, value);
}

/// <summary>
/// Subclass that exposes the protected ExecuteTextCommandAsync via
/// a public wrapper so tests can call it directly. Does NOT override
/// it — the real method runs, including the lock + guards.
/// </summary>
private class TextCommandTestableDevice : DaqifiDevice
{
public TextCommandTestableDevice(string name, IPAddress? ipAddress = null)
: base(name, ipAddress)
{
}

public Task<System.Collections.Generic.IReadOnlyList<string>> CallExecuteTextCommandAsync(
Action setupAction)
{
return ExecuteTextCommandAsync(setupAction, responseTimeoutMs: 100, completionTimeoutMs: 50);
}
}
}
}
79 changes: 66 additions & 13 deletions src/Daqifi.Core/Device/DaqifiDevice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,25 @@ public class DaqifiDevice : IDevice, IDisposable
private bool _isDisconnecting;
private bool _isInitialized;
private readonly List<IChannel> _channels = new();

// Serializes ExecuteTextCommandAsync calls device-wide (closes #186).
// Multiple callers — e.g. concurrent GetSdCardFilesAsync /
// DrainErrorQueueAsync / GetSystemInfoAsync — would otherwise race the
// protobuf-consumer pause/swap/restart sequence on the same stream and
// either intermix SCPI bytes on the wire or interleave reply lines
// between callers' returned lists. SemaphoreSlim chosen over Lock
// because the method is async; counter is (1, 1) for mutual exclusion.
private readonly SemaphoreSlim _textExchangeLock = new(1, 1);

// Thread that currently holds _textExchangeLock, or null when free.
// Lets ExecuteTextCommandAsync detect a same-thread re-entrant call
// (a setupAction that itself calls back into ExecuteTextCommandAsync)
// and throw InvalidOperationException instead of deadlocking on
// _textExchangeLock.WaitAsync(). Same safety guarantee — the
// re-entrant call would corrupt the consumer swap; better failure
// mode for callers (clean exception, stack trace) than a hung
// process (closes #186 follow-up).
private int? _textExchangeOwnerThreadId;

/// <summary>
/// Gets the current connection status of the device.
Expand Down Expand Up @@ -328,26 +347,53 @@ protected virtual async Task<IReadOnlyList<string>> ExecuteTextCommandAsync(
if (completionTimeoutMs <= 0)
throw new ArgumentOutOfRangeException(nameof(completionTimeoutMs), completionTimeoutMs, "Timeout must be positive.");

var sw = Stopwatch.StartNew();
cancellationToken.ThrowIfCancellationRequested();

if (!IsConnected)
// Same-thread re-entrancy detection: a setupAction that calls
// ExecuteTextCommandAsync on the same device would corrupt the
// consumer swap mid-flight. Surface as a clean exception rather
// than wedging on _textExchangeLock.WaitAsync() forever.
var currentTid = Environment.CurrentManagedThreadId;
if (_textExchangeOwnerThreadId == currentTid)
{
throw new InvalidOperationException("Device is not connected.");
throw new InvalidOperationException(
"ExecuteTextCommandAsync is not re-entrant on the same device; "
+ "do not call it from inside a setupAction callback.");
}

if (_transport == null)
await _textExchangeLock.WaitAsync(cancellationToken).ConfigureAwait(false);
_textExchangeOwnerThreadId = currentTid;
try
Comment thread
qodo-code-review[bot] marked this conversation as resolved.
Outdated
{
throw new InvalidOperationException("ExecuteTextCommandAsync requires a transport-based connection.");
}
// All validation runs INSIDE the lock so a competing thread
// calling DisconnectAsync() / Dispose() while we're blocked
// on WaitAsync() doesn't leave us with a stale _transport /
// _messageConsumer reference (closes the TOCTOU window
// documented in #186).
if (_disposed || _isDisconnecting)
{
throw new InvalidOperationException(
"ExecuteTextCommandAsync cannot run while the device is "
+ "disposing or disconnecting.");
}

cancellationToken.ThrowIfCancellationRequested();
if (!IsConnected)
{
throw new InvalidOperationException("Device is not connected.");
}

if (_transport == null)
{
throw new InvalidOperationException("ExecuteTextCommandAsync requires a transport-based connection.");
}

var collectedLines = new List<string>();
var stream = _transport.Stream;
int? originalReadTimeout = null;
var sw = Stopwatch.StartNew();
var collectedLines = new List<string>();
var stream = _transport.Stream;
int? originalReadTimeout = null;

try
{
try
{
if (stream.CanTimeout)
{
try
Expand Down Expand Up @@ -468,7 +514,13 @@ protected virtual async Task<IReadOnlyList<string>> ExecuteTextCommandAsync(
Trace.WriteLine($"[ExecuteTextCommandAsync] Total elapsed: {sw.ElapsedMilliseconds}ms");
}

return collectedLines;
return collectedLines;
}
finally
{
_textExchangeOwnerThreadId = null;
_textExchangeLock.Release();
}
}

/// <summary>
Expand Down Expand Up @@ -595,6 +647,7 @@ public void Dispose()
_messageConsumer?.Dispose();
_messageProducer?.Dispose();
_transport?.Dispose();
_textExchangeLock.Dispose();
_disposed = true;
Comment thread
qodo-code-review[bot] marked this conversation as resolved.
}
}
Expand Down
Loading