diff --git a/src/Daqifi.Core.Tests/Device/DaqifiDeviceTextCommandLockTests.cs b/src/Daqifi.Core.Tests/Device/DaqifiDeviceTextCommandLockTests.cs new file mode 100644 index 0000000..5673679 --- /dev/null +++ b/src/Daqifi.Core.Tests/Device/DaqifiDeviceTextCommandLockTests.cs @@ -0,0 +1,139 @@ +using System; +using System.Net; +using System.Reflection; +using System.Threading; +using System.Threading.Tasks; +using Daqifi.Core.Device; +using Xunit; + +namespace Daqifi.Core.Tests.Device +{ + /// + /// Tests for #186 — ExecuteTextCommandAsync must serialize concurrent + /// callers (SemaphoreSlim), reject re-entrant calls from the same + /// async flow (InvalidOperationException, not deadlock), and reject + /// calls when the device is disposed or disconnecting. + /// + /// The protected method is exercised via a thin subclass that exposes + /// it. The disposed/disconnecting guards are tested by setting the + /// relevant private fields via reflection — those guards run before + /// any transport / consumer interaction, so this gives faithful + /// coverage without a transport stack. Re-entrancy is tested by + /// flipping the AsyncLocal flag from inside the same logical flow. + /// + public class DaqifiDeviceTextCommandLockTests + { + [Fact] + public async Task ExecuteTextCommandAsync_WhenAlreadyInsideAsyncFlow_ThrowsInvalidOperation() + { + var device = new TextCommandTestableDevice("TestDevice"); + + // Simulate "we're already inside ExecuteTextCommandAsync on this + // async flow" by setting the AsyncLocal flag. The re-entrancy + // guard runs before WaitAsync(), so this check fires immediately + // without touching any transport state. + GetIsInsideTextExchange(device).Value = true; + + var ex = await Assert.ThrowsAsync( + () => device.CallExecuteTextCommandAsync(() => { })); + Assert.Contains("not re-entrant", ex.Message); + } + + [Fact] + public async Task ExecuteTextCommandAsync_WhenDisposing_ThrowsInvalidOperation() + { + var device = new TextCommandTestableDevice("TestDevice"); + SetIsDisconnecting(device, true); + + var ex = await Assert.ThrowsAsync( + () => device.CallExecuteTextCommandAsync(() => { })); + Assert.Contains("disposing or disconnecting", ex.Message); + } + + [Fact] + public async Task ExecuteTextCommandAsync_WhenDisposed_ThrowsInvalidOperation() + { + var device = new TextCommandTestableDevice("TestDevice"); + SetDisposed(device, true); + + var ex = await Assert.ThrowsAsync( + () => device.CallExecuteTextCommandAsync(() => { })); + Assert.Contains("disposing or disconnecting", ex.Message); + } + + [Fact] + public async Task ExecuteTextCommandAsync_ReleasesLockAfterValidationFailure() + { + // After a validation failure (e.g. not connected), the lock + // must be released so subsequent calls don't hang. Verified + // by calling twice — second call must reach validation too, + // not block on WaitAsync. + var device = new TextCommandTestableDevice("TestDevice"); + + await Assert.ThrowsAsync( + () => device.CallExecuteTextCommandAsync(() => { })); + // Second call: also throws, but ONLY if the lock was released. + // If the lock leaked, this would deadlock and xunit's per-test + // budget would time it out instead. + await Assert.ThrowsAsync( + () => device.CallExecuteTextCommandAsync(() => { })); + } + + [Fact] + public async Task ExecuteTextCommandAsync_AsyncLocalClearedAfterReturn() + { + // Even when the call throws, the AsyncLocal re-entrancy flag + // is cleared in the finally block so a subsequent call from + // the same flow doesn't false-positive the re-entrancy check. + var device = new TextCommandTestableDevice("TestDevice"); + + await Assert.ThrowsAsync( + () => device.CallExecuteTextCommandAsync(() => { })); + + Assert.False(GetIsInsideTextExchange(device).Value); + } + + // ── Reflection helpers — kept private to this test class so the + // production DaqifiDevice doesn't have to expose internals. ───── + + private static AsyncLocal GetIsInsideTextExchange(DaqifiDevice device) + { + return (AsyncLocal)typeof(DaqifiDevice) + .GetField("_isInsideTextExchange", BindingFlags.Instance | BindingFlags.NonPublic)! + .GetValue(device)!; + } + + private static void SetIsDisconnecting(DaqifiDevice device, bool value) + { + typeof(DaqifiDevice) + .GetField("_isDisconnecting", BindingFlags.Instance | BindingFlags.NonPublic)! + .SetValue(device, value); + } + + private static void SetDisposed(DaqifiDevice device, bool value) + { + typeof(DaqifiDevice) + .GetField("_disposed", BindingFlags.Instance | BindingFlags.NonPublic)! + .SetValue(device, value); + } + + /// + /// Subclass that exposes the protected ExecuteTextCommandAsync via + /// a public wrapper so tests can call it directly. Does NOT override + /// it — the real method runs, including the lock + guards. + /// + private class TextCommandTestableDevice : DaqifiDevice + { + public TextCommandTestableDevice(string name, IPAddress? ipAddress = null) + : base(name, ipAddress) + { + } + + public Task> CallExecuteTextCommandAsync( + Action setupAction) + { + return ExecuteTextCommandAsync(setupAction, responseTimeoutMs: 100, completionTimeoutMs: 50); + } + } + } +} diff --git a/src/Daqifi.Core/Device/DaqifiDevice.cs b/src/Daqifi.Core/Device/DaqifiDevice.cs index 62963bc..53ac332 100644 --- a/src/Daqifi.Core/Device/DaqifiDevice.cs +++ b/src/Daqifi.Core/Device/DaqifiDevice.cs @@ -80,6 +80,26 @@ public class DaqifiDevice : IDevice, IDisposable private bool _isDisconnecting; private bool _isInitialized; private readonly List _channels = new(); + + // Serializes ExecuteTextCommandAsync calls device-wide (closes #186). + // Multiple callers — e.g. concurrent GetSdCardFilesAsync / + // DrainErrorQueueAsync / GetSystemInfoAsync — would otherwise race the + // protobuf-consumer pause/swap/restart sequence on the same stream and + // either intermix SCPI bytes on the wire or interleave reply lines + // between callers' returned lists. SemaphoreSlim chosen over Lock + // because the method is async; counter is (1, 1) for mutual exclusion. + private readonly SemaphoreSlim _textExchangeLock = new(1, 1); + + // Async-context flag that tracks whether the current logical flow + // already holds _textExchangeLock. AsyncLocal flows across await + // resumptions on different threads, so a setupAction that re-enters + // ExecuteTextCommandAsync after a ConfigureAwait(false) hop is still + // detected and surfaced as InvalidOperationException — instead of + // wedging on _textExchangeLock.WaitAsync() (the re-entrant call + // would corrupt the consumer swap mid-flight). Plain + // Environment.CurrentManagedThreadId capture wouldn't work — the + // value seen before await may not match the value seen after. + private readonly AsyncLocal _isInsideTextExchange = new(); /// /// Gets the current connection status of the device. @@ -198,9 +218,42 @@ public void Connect() /// /// Disconnects from the device. /// + /// + /// Waits up to 10 seconds to acquire _textExchangeLock before + /// tearing down the consumer / producer / transport. This prevents + /// a race where an in-flight + /// is mid-swap (text consumer running on the stream, protobuf + /// consumer not yet restarted) and Disconnect rips the transport + /// out from under it. If the wait times out, Disconnect proceeds + /// anyway — a stuck text exchange must not block teardown forever. + /// The 10s budget covers the worst-case ExecuteTextCommandAsync + /// hold time with default timeouts (StopSafely up to 1s + maxWait + /// of responseTimeoutMs*5 = 5s by default + safety margin) and + /// most custom-timeout callers; on timeout the in-flight exchange + /// sees _isDisconnecting == true via the post-acquisition + /// validation and bails out cleanly. Callers wanting non-blocking + /// disconnect should drive this off a Task.Run. + /// public void Disconnect() { _isDisconnecting = true; + // Best-effort coordination with ExecuteTextCommandAsync — + // acquire the lock so we don't tear the transport out from + // under an in-flight text exchange. The lock IS released in + // the finally below when acquired (so a future Connect() + // followed by ExecuteTextCommandAsync isn't blocked); a + // stuck exchange that holds past the timeout drops to the + // _isDisconnecting validation path inside the exchange. + var lockAcquired = false; + try + { + lockAcquired = _textExchangeLock.Wait(TimeSpan.FromSeconds(10)); + } + catch (ObjectDisposedException) + { + // Disconnect called after Dispose — nothing to coordinate. + } + try { // Unsubscribe from message consumer events @@ -222,6 +275,16 @@ public void Disconnect() State = DeviceState.Disconnected; _isInitialized = false; _isDisconnecting = false; + if (lockAcquired) + { + try + { + _textExchangeLock.Release(); + } + catch (ObjectDisposedException) + { + } + } } } @@ -328,147 +391,209 @@ protected virtual async Task> ExecuteTextCommandAsync( if (completionTimeoutMs <= 0) throw new ArgumentOutOfRangeException(nameof(completionTimeoutMs), completionTimeoutMs, "Timeout must be positive."); - var sw = Stopwatch.StartNew(); + cancellationToken.ThrowIfCancellationRequested(); - if (!IsConnected) + // Async-context re-entrancy detection: a setupAction that calls + // ExecuteTextCommandAsync on the same device would corrupt the + // consumer swap mid-flight. Surface as a clean exception rather + // than wedging on _textExchangeLock.WaitAsync() forever. + // AsyncLocal flows across await thread hops so this catches + // re-entry even when the inner call resumes on a different + // thread than the outer call. + if (_isInsideTextExchange.Value) { - throw new InvalidOperationException("Device is not connected."); + throw new InvalidOperationException( + "ExecuteTextCommandAsync is not re-entrant on the same device; " + + "do not call it from inside a setupAction callback."); } - if (_transport == null) + try { - throw new InvalidOperationException("ExecuteTextCommandAsync requires a transport-based connection."); + await _textExchangeLock.WaitAsync(cancellationToken).ConfigureAwait(false); + } + catch (ObjectDisposedException) + { + // Dispose() raced ahead of us and disposed the semaphore. + // Surface the same clean failure as the post-acquisition + // _disposed check below, instead of leaking a low-level + // teardown exception to callers. + throw new InvalidOperationException( + "ExecuteTextCommandAsync cannot run because the device is disposed."); } - cancellationToken.ThrowIfCancellationRequested(); - - var collectedLines = new List(); - var stream = _transport.Stream; - int? originalReadTimeout = null; - + _isInsideTextExchange.Value = true; try { - if (stream.CanTimeout) + // All validation runs INSIDE the lock so a competing thread + // calling DisconnectAsync() / Dispose() while we're blocked + // on WaitAsync() doesn't leave us with a stale _transport / + // _messageConsumer reference (closes the TOCTOU window + // documented in #186). + if (_disposed || _isDisconnecting) { - try - { - originalReadTimeout = stream.ReadTimeout; - stream.ReadTimeout = Math.Min(500, Math.Max(100, responseTimeoutMs / 4)); - } - catch - { - // Some streams may not allow setting read timeout; ignore. - originalReadTimeout = null; - } + throw new InvalidOperationException( + "ExecuteTextCommandAsync cannot run while the device is " + + "disposing or disconnecting."); } - // Stop the protobuf consumer so it doesn't compete for stream bytes. - // The serial transport sets ReadTimeout=500ms after connect, so the - // consumer thread's blocking Read will unblock within 500ms. - if (_messageConsumer != null) + if (!IsConnected) { - _messageConsumer.MessageReceived -= OnInboundMessageReceived; - var stopped = _messageConsumer.StopSafely(timeoutMs: 1000); - if (!stopped) - { - _messageConsumer.Stop(); - } + throw new InvalidOperationException("Device is not connected."); } - Trace.WriteLine($"[ExecuteTextCommandAsync] Protobuf consumer stopped at {sw.ElapsedMilliseconds}ms"); + if (_transport == null) + { + throw new InvalidOperationException("ExecuteTextCommandAsync requires a transport-based connection."); + } - // Create a temporary text consumer on the same stream - using var textConsumer = new StreamMessageConsumer( - _transport.Stream, - new LineBasedMessageParser()); + var sw = Stopwatch.StartNew(); + var collectedLines = new List(); + var stream = _transport.Stream; + int? originalReadTimeout = null; - textConsumer.MessageReceived += (_, e) => + try { - collectedLines.Add(e.Message.Data); - }; + if (stream.CanTimeout) + { + try + { + originalReadTimeout = stream.ReadTimeout; + stream.ReadTimeout = Math.Min(500, Math.Max(100, responseTimeoutMs / 4)); + } + catch + { + // Some streams may not allow setting read timeout; ignore. + originalReadTimeout = null; + } + } - textConsumer.Start(); - await Task.Delay(50, cancellationToken); + // Stop the protobuf consumer so it doesn't compete for stream bytes. + // The serial transport sets ReadTimeout=500ms after connect, so the + // consumer thread's blocking Read will unblock within 500ms. + if (_messageConsumer != null) + { + _messageConsumer.MessageReceived -= OnInboundMessageReceived; + var stopped = _messageConsumer.StopSafely(timeoutMs: 1000); + if (!stopped) + { + _messageConsumer.Stop(); + } + } - Trace.WriteLine($"[ExecuteTextCommandAsync] Text consumer started at {sw.ElapsedMilliseconds}ms"); + Trace.WriteLine($"[ExecuteTextCommandAsync] Protobuf consumer stopped at {sw.ElapsedMilliseconds}ms"); - // Execute the setup action (sends SCPI commands) - setupAction(); + // Create a temporary text consumer on the same stream + using var textConsumer = new StreamMessageConsumer( + _transport.Stream, + new LineBasedMessageParser()); - Trace.WriteLine($"[ExecuteTextCommandAsync] Setup action completed at {sw.ElapsedMilliseconds}ms"); + textConsumer.MessageReceived += (_, e) => + { + collectedLines.Add(e.Message.Data); + }; - // Wait for responses using a two-phase inactivity-based timeout: - // Phase 1: Wait up to responseTimeoutMs for the first response. - // Phase 2: After receiving data, wait completionTimeoutMs of inactivity to finish. - var lastMessageTime = DateTime.UtcNow; - var maxWait = TimeSpan.FromMilliseconds(responseTimeoutMs * 5); - var startTime = DateTime.UtcNow; - var hasReceivedAny = false; + textConsumer.Start(); + // ConfigureAwait(false): the lock is held, so resuming on a captured + // sync context (e.g. UI thread) would deadlock if that thread calls Disconnect(). + await Task.Delay(50, cancellationToken).ConfigureAwait(false); - while (DateTime.UtcNow - startTime < maxWait) - { - var previousCount = collectedLines.Count; - await Task.Delay(50, cancellationToken); - if (collectedLines.Count > previousCount) + Trace.WriteLine($"[ExecuteTextCommandAsync] Text consumer started at {sw.ElapsedMilliseconds}ms"); + + // Execute the setup action (sends SCPI commands) + setupAction(); + + Trace.WriteLine($"[ExecuteTextCommandAsync] Setup action completed at {sw.ElapsedMilliseconds}ms"); + + // Wait for responses using a two-phase inactivity-based timeout: + // Phase 1: Wait up to responseTimeoutMs for the first response. + // Phase 2: After receiving data, wait completionTimeoutMs of inactivity to finish. + var lastMessageTime = DateTime.UtcNow; + var maxWait = TimeSpan.FromMilliseconds(responseTimeoutMs * 5); + var startTime = DateTime.UtcNow; + var hasReceivedAny = false; + + while (DateTime.UtcNow - startTime < maxWait) { - lastMessageTime = DateTime.UtcNow; - if (!hasReceivedAny) + var previousCount = collectedLines.Count; + await Task.Delay(50, cancellationToken).ConfigureAwait(false); + if (collectedLines.Count > previousCount) { - hasReceivedAny = true; - Trace.WriteLine($"[ExecuteTextCommandAsync] First response at {sw.ElapsedMilliseconds}ms"); + lastMessageTime = DateTime.UtcNow; + if (!hasReceivedAny) + { + hasReceivedAny = true; + Trace.WriteLine($"[ExecuteTextCommandAsync] First response at {sw.ElapsedMilliseconds}ms"); + } } - } - var elapsed = DateTime.UtcNow - lastMessageTime; + var elapsed = DateTime.UtcNow - lastMessageTime; - if (hasReceivedAny) - { - // Phase 2: short completion timeout after first data - if (elapsed >= TimeSpan.FromMilliseconds(completionTimeoutMs)) + if (hasReceivedAny) { - break; + // Phase 2: short completion timeout after first data + if (elapsed >= TimeSpan.FromMilliseconds(completionTimeoutMs)) + { + break; + } } - } - else - { - // Phase 1: full initial timeout waiting for first data - if (elapsed >= TimeSpan.FromMilliseconds(responseTimeoutMs)) + else { - break; + // Phase 1: full initial timeout waiting for first data + if (elapsed >= TimeSpan.FromMilliseconds(responseTimeoutMs)) + { + break; + } } } - } - Trace.WriteLine($"[ExecuteTextCommandAsync] Collection complete at {sw.ElapsedMilliseconds}ms, {collectedLines.Count} lines"); + Trace.WriteLine($"[ExecuteTextCommandAsync] Collection complete at {sw.ElapsedMilliseconds}ms, {collectedLines.Count} lines"); - // Stop the text consumer - textConsumer.StopSafely(); - } - finally - { - if (originalReadTimeout.HasValue && stream.CanTimeout) + // Stop the text consumer + textConsumer.StopSafely(); + } + finally { - try + if (originalReadTimeout.HasValue && stream.CanTimeout) { - stream.ReadTimeout = originalReadTimeout.Value; + try + { + stream.ReadTimeout = originalReadTimeout.Value; + } + catch + { + // Ignore failures when restoring timeout. + } } - catch + + // Restart the protobuf consumer + if (_messageConsumer != null) { - // Ignore failures when restoring timeout. + _messageConsumer.Start(); + _messageConsumer.MessageReceived += OnInboundMessageReceived; } + + Trace.WriteLine($"[ExecuteTextCommandAsync] Total elapsed: {sw.ElapsedMilliseconds}ms"); } - // Restart the protobuf consumer - if (_messageConsumer != null) + return collectedLines; + } + finally + { + _isInsideTextExchange.Value = false; + // Release can race with Dispose() — Dispose acquires the lock + // before disposing it, but if that acquisition timed out and + // Dispose proceeded anyway, our SemaphoreSlim handle is now + // gone. Treat that as a benign teardown signal rather than + // surfacing it from the finally and masking the original + // exception (if any) from the try body. + try + { + _textExchangeLock.Release(); + } + catch (ObjectDisposedException) { - _messageConsumer.Start(); - _messageConsumer.MessageReceived += OnInboundMessageReceived; } - - Trace.WriteLine($"[ExecuteTextCommandAsync] Total elapsed: {sw.ElapsedMilliseconds}ms"); } - - return collectedLines; } /// @@ -595,6 +720,7 @@ public void Dispose() _messageConsumer?.Dispose(); _messageProducer?.Dispose(); _transport?.Dispose(); + _textExchangeLock.Dispose(); _disposed = true; } }