Skip to content

Commit de605ef

Browse files
authored
Log possibly unobserved exceptions in MQTT (#3319)
* Log possibly unobserved exceptions in MQTT * Apply other IDE suggestions
1 parent 62b3eb6 commit de605ef

File tree

1 file changed

+62
-56
lines changed

1 file changed

+62
-56
lines changed

iothub/device/src/Transport/Mqtt/MqttTransportHandler.cs

Lines changed: 62 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -188,19 +188,12 @@ internal MqttTransportHandler(
188188
else
189189
{
190190
ClientOptions options = context.ClientOptions;
191-
switch (settings.GetTransportType())
191+
_channelFactory = settings.GetTransportType() switch
192192
{
193-
case TransportType.Mqtt_Tcp_Only:
194-
_channelFactory = CreateChannelFactory(iotHubConnectionString, settings, context.ProductInfo, options);
195-
break;
196-
197-
case TransportType.Mqtt_WebSocket_Only:
198-
_channelFactory = CreateWebSocketChannelFactory(iotHubConnectionString, settings, context.ProductInfo, options);
199-
break;
200-
201-
default:
202-
throw new InvalidOperationException("Unsupported Transport Setting {0}".FormatInvariant(settings.GetTransportType()));
203-
}
193+
TransportType.Mqtt_Tcp_Only => CreateChannelFactory(iotHubConnectionString, settings, context.ProductInfo, options),
194+
TransportType.Mqtt_WebSocket_Only => CreateWebSocketChannelFactory(iotHubConnectionString, settings, context.ProductInfo, options),
195+
_ => throw new InvalidOperationException("Unsupported Transport Setting {0}".FormatInvariant(settings.GetTransportType())),
196+
};
204197
}
205198

206199
_closeRetryPolicy = new RetryPolicy(new TransientErrorIgnoreStrategy(), 5, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));
@@ -277,37 +270,35 @@ public override async Task<Message> ReceiveAsync(CancellationToken cancellationT
277270

278271
return null;
279272
}
280-
else
281-
{
282-
try
283-
{
284-
if (Logging.IsEnabled)
285-
Logging.Enter(
286-
this,
287-
cancellationToken, $"ReceiveAsync() called with cancellation requested state of: {cancellationToken.IsCancellationRequested}",
288-
$"{nameof(ReceiveAsync)}");
289273

290-
cancellationToken.ThrowIfCancellationRequested();
274+
try
275+
{
276+
if (Logging.IsEnabled)
277+
Logging.Enter(
278+
this,
279+
cancellationToken, $"ReceiveAsync() called with cancellation requested state of: {cancellationToken.IsCancellationRequested}",
280+
$"{nameof(ReceiveAsync)}");
291281

292-
EnsureValidState();
282+
cancellationToken.ThrowIfCancellationRequested();
293283

294-
if (State != TransportState.Receiving)
295-
{
296-
await SubscribeCloudToDeviceMessagesAsync().ConfigureAwait(false);
297-
}
284+
EnsureValidState();
298285

299-
await WaitUntilC2dMessageArrivesAsync(cancellationToken).ConfigureAwait(false);
300-
return ProcessC2dMessage();
301-
}
302-
finally
286+
if (State != TransportState.Receiving)
303287
{
304-
if (Logging.IsEnabled)
305-
Logging.Exit(
306-
this,
307-
cancellationToken,
308-
$"Exiting ReceiveAsync() with cancellation requested state of: {cancellationToken.IsCancellationRequested}",
309-
$"{nameof(ReceiveAsync)}");
288+
await SubscribeCloudToDeviceMessagesAsync().ConfigureAwait(false);
310289
}
290+
291+
await WaitUntilC2dMessageArrivesAsync(cancellationToken).ConfigureAwait(false);
292+
return ProcessC2dMessage();
293+
}
294+
finally
295+
{
296+
if (Logging.IsEnabled)
297+
Logging.Exit(
298+
this,
299+
cancellationToken,
300+
$"Exiting ReceiveAsync() with cancellation requested state of: {cancellationToken.IsCancellationRequested}",
301+
$"{nameof(ReceiveAsync)}");
311302
}
312303
}
313304

@@ -320,6 +311,7 @@ public override async Task<Message> ReceiveAsync(TimeoutHelper timeoutHelper)
320311

321312
return null;
322313
}
314+
323315
try
324316
{
325317
if (Logging.IsEnabled)
@@ -405,7 +397,7 @@ public override async Task CompleteAsync(string lockToken, CancellationToken can
405397
isTransient: false);
406398
}
407399

408-
if (_completionQueue.Count == 0)
400+
if (_completionQueue.IsEmpty)
409401
{
410402
throw new IotHubException("Unknown lock token.", isTransient: false);
411403
}
@@ -444,9 +436,10 @@ protected override void Dispose(bool disposing)
444436
try
445437
{
446438
if (Logging.IsEnabled)
447-
{
448-
Logging.Enter(this, $"{nameof(DefaultDelegatingHandler)}.Disposed={_isDisposed}; disposing={disposing}", $"{nameof(MqttTransportHandler)}.{nameof(Dispose)}");
449-
}
439+
Logging.Enter(
440+
this,
441+
$"{nameof(DefaultDelegatingHandler)}.Disposed={_isDisposed}; disposing={disposing}",
442+
$"{nameof(MqttTransportHandler)}.{nameof(Dispose)}");
450443

451444
if (!_isDisposed)
452445
{
@@ -458,6 +451,18 @@ protected override void Dispose(bool disposing)
458451
CleanUpAsync().GetAwaiter().GetResult();
459452
}
460453

454+
// Log the task completion source tasks' exceptions and avoid unobserved exceptions.
455+
if (_connectCompletion.Task.Exception != null)
456+
{
457+
if (Logging.IsEnabled)
458+
Logging.Error(this, $"{_connectCompletion} has exception {_connectCompletion.Task.Exception}", nameof(Dispose));
459+
}
460+
if (_subscribeCompletionSource.Task.Exception != null)
461+
{
462+
if (Logging.IsEnabled)
463+
Logging.Error(this, $"{_subscribeCompletionSource} has exception {_subscribeCompletionSource.Task.Exception}", nameof(Dispose));
464+
}
465+
461466
_disconnectAwaitersCancellationSource?.Dispose();
462467
_disconnectAwaitersCancellationSource = null;
463468

@@ -480,9 +485,10 @@ protected override void Dispose(bool disposing)
480485
finally
481486
{
482487
if (Logging.IsEnabled)
483-
{
484-
Logging.Exit(this, $"{nameof(DefaultDelegatingHandler)}.Disposed={_isDisposed}; disposing={disposing}", $"{nameof(MqttTransportHandler)}.{nameof(Dispose)}");
485-
}
488+
Logging.Exit(
489+
this,
490+
$"{nameof(DefaultDelegatingHandler)}.Disposed={_isDisposed}; disposing={disposing}",
491+
$"{nameof(MqttTransportHandler)}.{nameof(Dispose)}");
486492
}
487493
}
488494

@@ -726,8 +732,9 @@ public async void OnError(Exception exception)
726732
break;
727733

728734
default:
729-
Debug.Fail($"Unknown transport state: {previousState}");
730-
throw new InvalidOperationException();
735+
string error = $"Unknown transport state: {previousState}";
736+
Debug.Fail(error);
737+
throw new InvalidOperationException(error);
731738
}
732739

733740
await _closeRetryPolicy.RunWithRetryAsync(CleanUpImplAsync).ConfigureAwait(true);
@@ -753,6 +760,7 @@ private TransportState MoveToStateIfPossible(TransportState destination, Transpo
753760
{
754761
return previousState;
755762
}
763+
756764
TransportState prevState;
757765
if ((prevState = (TransportState)Interlocked.CompareExchange(ref _state, (int)destination, (int)previousState)) == previousState)
758766
{
@@ -944,11 +952,9 @@ public override async Task<Twin> SendTwinGetAsync(CancellationToken cancellation
944952
Properties = JsonConvert.DeserializeObject<TwinProperties>(body),
945953
};
946954
}
947-
catch (JsonReaderException ex)
955+
catch (JsonReaderException ex) when (Logging.IsEnabled)
948956
{
949-
if (Logging.IsEnabled)
950-
Logging.Error(this, $"Failed to parse Twin JSON: {ex}. Message body: '{body}'");
951-
957+
Logging.Error(this, $"Failed to parse Twin JSON: {ex}. Message body: '{body}'");
952958
throw;
953959
}
954960
}
@@ -1117,7 +1123,7 @@ private async Task<Message> SendTwinRequestAsync(Message request, string rid, Ca
11171123
Message response = null; ;
11181124
ExceptionDispatchInfo responseException = null;
11191125

1120-
Action<Message> onTwinResponse = (Message possibleResponse) =>
1126+
void OnTwinResponse(Message possibleResponse)
11211127
{
11221128
try
11231129
{
@@ -1152,11 +1158,11 @@ private async Task<Message> SendTwinRequestAsync(Message request, string rid, Ca
11521158
responseException = ExceptionDispatchInfo.Capture(e);
11531159
responseReceived.Release();
11541160
}
1155-
};
1161+
}
11561162

11571163
try
11581164
{
1159-
_twinResponseEvent += onTwinResponse;
1165+
_twinResponseEvent += OnTwinResponse;
11601166

11611167
await SendEventAsync(request, cancellationToken).ConfigureAwait(false);
11621168

@@ -1175,7 +1181,7 @@ private async Task<Message> SendTwinRequestAsync(Message request, string rid, Ca
11751181
}
11761182
finally
11771183
{
1178-
_twinResponseEvent -= onTwinResponse;
1184+
_twinResponseEvent -= OnTwinResponse;
11791185
}
11801186
}
11811187

@@ -1185,7 +1191,7 @@ private Func<IPAddress[], int, Task<IChannel>> CreateChannelFactory(IotHubConnec
11851191
{
11861192
IChannel channel = null;
11871193

1188-
Func<Stream, SslStream> streamFactory = stream => new SslStream(stream, true, settings.RemoteCertificateValidationCallback);
1194+
SslStream StreamFactory(Stream stream) => new SslStream(stream, true, settings.RemoteCertificateValidationCallback);
11891195

11901196
List<X509Certificate> certs = settings.ClientCertificate == null
11911197
? new List<X509Certificate>(0)
@@ -1216,7 +1222,7 @@ private Func<IPAddress[], int, Task<IChannel>> CreateChannelFactory(IotHubConnec
12161222
.Option(ChannelOption.Allocator, UnpooledByteBufferAllocator.Default)
12171223
.Handler(new ActionChannelInitializer<ISocketChannel>(ch =>
12181224
{
1219-
var tlsHandler = new TlsHandler(streamFactory, clientTlsSettings);
1225+
var tlsHandler = new TlsHandler(StreamFactory, clientTlsSettings);
12201226
ch.Pipeline.AddLast(
12211227
tlsHandler,
12221228
MqttEncoder.Instance,

0 commit comments

Comments
 (0)