From 5629ce3115feb65697749a8e05ff54a0bc75520d Mon Sep 17 00:00:00 2001 From: Anton Smolkov Date: Sun, 1 Dec 2024 16:42:50 +0300 Subject: [PATCH 1/7] Improve session message injection methods --- .../MqttPendingMessagesOverflowException.cs | 19 ++++++++ Source/MQTTnet.Server/Internal/MqttSession.cs | 14 ++++-- .../Status/MqttSessionStatus.cs | 44 +++++++++++++++++-- Source/MQTTnet/Internal/MqttPacketBus.cs | 8 ---- Source/MQTTnet/Internal/MqttPacketBusItem.cs | 8 ++-- 5 files changed, 73 insertions(+), 20 deletions(-) create mode 100644 Source/MQTTnet.Server/Exceptions/MqttPendingMessagesOverflowException.cs diff --git a/Source/MQTTnet.Server/Exceptions/MqttPendingMessagesOverflowException.cs b/Source/MQTTnet.Server/Exceptions/MqttPendingMessagesOverflowException.cs new file mode 100644 index 000000000..d17708240 --- /dev/null +++ b/Source/MQTTnet.Server/Exceptions/MqttPendingMessagesOverflowException.cs @@ -0,0 +1,19 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +namespace MQTTnet.Server.Exceptions +{ + public class MqttPendingMessagesOverflowException : Exception + { + public string SessionId { get; } + public MqttPendingMessagesOverflowStrategy OverflowStrategy { get; } + + public MqttPendingMessagesOverflowException(string sessionId, MqttPendingMessagesOverflowStrategy overflowStrategy) + : base($"Send buffer max pending messages overflow occurred for session '{sessionId}'. Strategy: {overflowStrategy}.") + { + SessionId = sessionId; + OverflowStrategy = overflowStrategy; + } + } +} \ No newline at end of file diff --git a/Source/MQTTnet.Server/Internal/MqttSession.cs b/Source/MQTTnet.Server/Internal/MqttSession.cs index 0aea7d803..30fa0b84d 100644 --- a/Source/MQTTnet.Server/Internal/MqttSession.cs +++ b/Source/MQTTnet.Server/Internal/MqttSession.cs @@ -6,6 +6,7 @@ using MQTTnet.Internal; using MQTTnet.Packets; using MQTTnet.Protocol; +using MQTTnet.Server.Exceptions; namespace MQTTnet.Server.Internal; @@ -109,10 +110,11 @@ public void EnqueueControlPacket(MqttPacketBusItem packetBusItem) public EnqueueDataPacketResult EnqueueDataPacket(MqttPacketBusItem packetBusItem) { - if (_packetBus.ItemsCount(MqttPacketBusPartition.Data) >= _serverOptions.MaxPendingMessagesPerClient) + if (PendingDataPacketsCount >= _serverOptions.MaxPendingMessagesPerClient) { if (_serverOptions.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropNewMessage) { + packetBusItem.Fail(new MqttPendingMessagesOverflowException(Id, _serverOptions.PendingMessagesOverflowStrategy)); return EnqueueDataPacketResult.Dropped; } @@ -121,10 +123,14 @@ public EnqueueDataPacketResult EnqueueDataPacket(MqttPacketBusItem packetBusItem // Only drop from the data partition. Dropping from control partition might break the connection // because the client does not receive PINGREQ packets etc. any longer. var firstItem = _packetBus.DropFirstItem(MqttPacketBusPartition.Data); - if (firstItem != null && _eventContainer.QueuedApplicationMessageOverwrittenEvent.HasHandlers) + if (firstItem != null) { - var eventArgs = new QueueMessageOverwrittenEventArgs(Id, firstItem.Packet); - _eventContainer.QueuedApplicationMessageOverwrittenEvent.InvokeAsync(eventArgs).ConfigureAwait(false); + firstItem.Fail(new MqttPendingMessagesOverflowException(Id, _serverOptions.PendingMessagesOverflowStrategy)); + if (_eventContainer.QueuedApplicationMessageOverwrittenEvent.HasHandlers) + { + var eventArgs = new QueueMessageOverwrittenEventArgs(Id, firstItem.Packet); + _eventContainer.QueuedApplicationMessageOverwrittenEvent.InvokeAsync(eventArgs).ConfigureAwait(false); + } } } } diff --git a/Source/MQTTnet.Server/Status/MqttSessionStatus.cs b/Source/MQTTnet.Server/Status/MqttSessionStatus.cs index ecb8460fb..aa866c259 100644 --- a/Source/MQTTnet.Server/Status/MqttSessionStatus.cs +++ b/Source/MQTTnet.Server/Status/MqttSessionStatus.cs @@ -4,6 +4,7 @@ using System.Collections; using MQTTnet.Internal; +using MQTTnet.Packets; using MQTTnet.Server.Internal; using MQTTnet.Server.Internal.Formatter; @@ -40,22 +41,57 @@ public Task DeleteAsync() return _session.DeleteAsync(); } - public Task DeliverApplicationMessageAsync(MqttApplicationMessage applicationMessage) + /// + /// Delivers an application message immediately to the session. + /// + /// The application message to deliver. + /// + /// A task that represents the asynchronous operation. The result contains the delivered MQTT publish packet. + /// + public async Task DeliverApplicationMessageAsync(MqttApplicationMessage applicationMessage) { ArgumentNullException.ThrowIfNull(applicationMessage); var packetBusItem = new MqttPacketBusItem(MqttPublishPacketFactory.Create(applicationMessage)); _session.EnqueueDataPacket(packetBusItem); - return packetBusItem.WaitAsync(); + var mqttPacket = await packetBusItem.WaitAsync(); + + return (MqttPublishPacket)mqttPacket; } - public Task EnqueueApplicationMessageAsync(MqttApplicationMessage applicationMessage) + /// + /// Attempts to enqueue an application message to the session's send buffer. + /// + /// The application message to enqueue. + /// The corresponding MQTT publish packed, if the operation was successful. + /// true if the message was successfully enqueued; otherwise, false. + /// + /// When is set to , + /// this method always returns true. + /// However, an existing message in the queue may be dropped later to make room for the newly enqueued message. + /// Such dropped messages can be tracked by subscribing to event. + /// + public bool TryEnqueueApplicationMessage(MqttApplicationMessage applicationMessage, out MqttPublishPacket publishPacket) { ArgumentNullException.ThrowIfNull(applicationMessage); - _session.EnqueueDataPacket(new MqttPacketBusItem(MqttPublishPacketFactory.Create(applicationMessage))); + publishPacket = MqttPublishPacketFactory.Create(applicationMessage); + var enqueueDataPacketResult = _session.EnqueueDataPacket(new MqttPacketBusItem(publishPacket)); + if (enqueueDataPacketResult == EnqueueDataPacketResult.Enqueued) + { + return true; + } + + publishPacket = null; + return false; + } + + [Obsolete("This method is obsolete. Use TryEnqueueApplicationMessage instead.")] + public Task EnqueueApplicationMessageAsync(MqttApplicationMessage applicationMessage) + { + TryEnqueueApplicationMessage(applicationMessage, out _); return CompletedTask.Instance; } } \ No newline at end of file diff --git a/Source/MQTTnet/Internal/MqttPacketBus.cs b/Source/MQTTnet/Internal/MqttPacketBus.cs index 8aec37565..46e2b3493 100644 --- a/Source/MQTTnet/Internal/MqttPacketBus.cs +++ b/Source/MQTTnet/Internal/MqttPacketBus.cs @@ -141,14 +141,6 @@ public List ExportPackets(MqttPacketBusPartition partition) } } - public int ItemsCount(MqttPacketBusPartition partition) - { - lock (_syncRoot) - { - return _partitions[(int)partition].Count; - } - } - public int PartitionItemsCount(MqttPacketBusPartition partition) { lock (_syncRoot) diff --git a/Source/MQTTnet/Internal/MqttPacketBusItem.cs b/Source/MQTTnet/Internal/MqttPacketBusItem.cs index b94654cf5..6fb5b114d 100644 --- a/Source/MQTTnet/Internal/MqttPacketBusItem.cs +++ b/Source/MQTTnet/Internal/MqttPacketBusItem.cs @@ -10,8 +10,8 @@ namespace MQTTnet.Internal { public sealed class MqttPacketBusItem { - readonly AsyncTaskCompletionSource _promise = new AsyncTaskCompletionSource(); - + readonly AsyncTaskCompletionSource _promise = new AsyncTaskCompletionSource(); + public MqttPacketBusItem(MqttPacket packet) { Packet = packet ?? throw new ArgumentNullException(nameof(packet)); @@ -28,7 +28,7 @@ public void Cancel() public void Complete() { - _promise.TrySetResult(true); + _promise.TrySetResult(Packet); Completed?.Invoke(this, EventArgs.Empty); } @@ -37,7 +37,7 @@ public void Fail(Exception exception) _promise.TrySetException(exception); } - public Task WaitAsync() + public Task WaitAsync() { return _promise.Task; } From e6ae6b31ba42b6c1ecabecd7c0b3ad110fad3f85 Mon Sep 17 00:00:00 2001 From: Anton Smolkov Date: Sun, 1 Dec 2024 16:43:00 +0300 Subject: [PATCH 2/7] Add more tests for session injection --- .../Options/MqttServerOptionsBuilder.cs | 6 + Source/MQTTnet.Tests/BaseTestClass.cs | 7 +- .../MQTTnet.Tests/Mockups/TestEnvironment.cs | 8 +- .../MQTTnet.Tests/Server/Injection_Tests.cs | 447 ++++++++++++++++-- 4 files changed, 420 insertions(+), 48 deletions(-) diff --git a/Source/MQTTnet.Server/Options/MqttServerOptionsBuilder.cs b/Source/MQTTnet.Server/Options/MqttServerOptionsBuilder.cs index 2e86e21eb..c238ea016 100644 --- a/Source/MQTTnet.Server/Options/MqttServerOptionsBuilder.cs +++ b/Source/MQTTnet.Server/Options/MqttServerOptionsBuilder.cs @@ -115,6 +115,12 @@ public MqttServerOptionsBuilder WithMaxPendingMessagesPerClient(int value) return this; } + public MqttServerOptionsBuilder WithPendingMessagesOverflowStrategy(MqttPendingMessagesOverflowStrategy value) + { + _options.PendingMessagesOverflowStrategy = value; + return this; + } + public MqttServerOptionsBuilder WithoutDefaultEndpoint() { _options.DefaultEndpointOptions.IsEnabled = false; diff --git a/Source/MQTTnet.Tests/BaseTestClass.cs b/Source/MQTTnet.Tests/BaseTestClass.cs index 8e5248e7f..00291b024 100644 --- a/Source/MQTTnet.Tests/BaseTestClass.cs +++ b/Source/MQTTnet.Tests/BaseTestClass.cs @@ -13,10 +13,11 @@ namespace MQTTnet.Tests public abstract class BaseTestClass { public TestContext TestContext { get; set; } - - protected TestEnvironment CreateTestEnvironment(MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311) + + protected TestEnvironment CreateTestEnvironment( + MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311, bool trackUnobservedTaskException = true) { - return new TestEnvironment(TestContext, protocolVersion); + return new TestEnvironment(TestContext, protocolVersion, trackUnobservedTaskException); } protected Task LongTestDelay() diff --git a/Source/MQTTnet.Tests/Mockups/TestEnvironment.cs b/Source/MQTTnet.Tests/Mockups/TestEnvironment.cs index 4f1391f15..6f2d8ae73 100644 --- a/Source/MQTTnet.Tests/Mockups/TestEnvironment.cs +++ b/Source/MQTTnet.Tests/Mockups/TestEnvironment.cs @@ -32,12 +32,16 @@ public TestEnvironment() : this(null) { } - public TestEnvironment(TestContext testContext, MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311) + public TestEnvironment( + TestContext testContext, MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311, bool trackUnobservedTaskException = true) { _protocolVersion = protocolVersion; TestContext = testContext; - TaskScheduler.UnobservedTaskException += TrackUnobservedTaskException; + if (trackUnobservedTaskException) + { + TaskScheduler.UnobservedTaskException += TrackUnobservedTaskException; + } ServerLogger.LogMessagePublished += (s, e) => { diff --git a/Source/MQTTnet.Tests/Server/Injection_Tests.cs b/Source/MQTTnet.Tests/Server/Injection_Tests.cs index cefbc34dd..7ed1fc041 100644 --- a/Source/MQTTnet.Tests/Server/Injection_Tests.cs +++ b/Source/MQTTnet.Tests/Server/Injection_Tests.cs @@ -1,7 +1,10 @@ +using System; using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.Internal; +using MQTTnet.Packets; using MQTTnet.Server; +using MQTTnet.Server.Exceptions; namespace MQTTnet.Tests.Server { @@ -9,79 +12,437 @@ namespace MQTTnet.Tests.Server public sealed class Injection_Tests : BaseTestClass { [TestMethod] - public async Task Inject_Application_Message_At_Session_Level() + public async Task Enqueue_Application_Message_At_Session_Level() { - using (var testEnvironment = CreateTestEnvironment()) + using var testEnvironment = CreateTestEnvironment(); + + var server = await testEnvironment.StartServer(); + var receiver1 = await testEnvironment.ConnectClient(); + var receiver2 = await testEnvironment.ConnectClient(); + var messageReceivedHandler1 = testEnvironment.CreateApplicationMessageHandler(receiver1); + var messageReceivedHandler2 = testEnvironment.CreateApplicationMessageHandler(receiver2); + + var status = await server.GetSessionsAsync(); + var clientStatus = status[0]; + + await receiver1.SubscribeAsync("#"); + await receiver2.SubscribeAsync("#"); + + var message = new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build(); + var enqueued = clientStatus.TryEnqueueApplicationMessage(message, out var publishPacket); + + Assert.IsTrue(enqueued); + + await LongTestDelay(); + + Assert.AreEqual(1, messageReceivedHandler1.ReceivedEventArgs.Count); + Assert.AreEqual(publishPacket.PacketIdentifier, messageReceivedHandler1.ReceivedEventArgs[0].PacketIdentifier); + Assert.AreEqual("InjectedOne", messageReceivedHandler1.ReceivedEventArgs[0].ApplicationMessage.Topic); + + // The second receiver should NOT receive the message. + Assert.AreEqual(0, messageReceivedHandler2.ReceivedEventArgs.Count); + } + + [TestMethod] + public async Task Enqueue_Application_Message_At_Session_Level_QueueOverflow_DropNewMessageStrategy() + { + using var testEnvironment = CreateTestEnvironment(trackUnobservedTaskException: false); + + var server = await testEnvironment.StartServer( + builder => builder + .WithMaxPendingMessagesPerClient(1) + .WithPendingMessagesOverflowStrategy(MqttPendingMessagesOverflowStrategy.DropNewMessage)); + + var receiver = await testEnvironment.ConnectClient(); + + var firstMessageOutboundPacketInterceptedTcs = new TaskCompletionSource(); + server.InterceptingOutboundPacketAsync += async args => { - var server = await testEnvironment.StartServer(); - var receiver1 = await testEnvironment.ConnectClient(); - var receiver2 = await testEnvironment.ConnectClient(); - var messageReceivedHandler1 = testEnvironment.CreateApplicationMessageHandler(receiver1); - var messageReceivedHandler2 = testEnvironment.CreateApplicationMessageHandler(receiver2); + // - The first message is dequeued normally and calls this delay + // - The second message fills the outbound queue + // - The third message overflows the outbound queue + if (args.Packet is MqttPublishPacket) + { + firstMessageOutboundPacketInterceptedTcs.SetResult(); + await Task.Delay(TimeSpan.FromDays(1), args.CancellationToken); + } + }; - var status = await server.GetSessionsAsync(); - var clientStatus = status[0]; + var firstMessageEvicted = false; + var secondMessageEvicted = false; + var thirdMessageEvicted = false; + + server.QueuedApplicationMessageOverwrittenAsync += args => + { + if (args.Packet is not MqttPublishPacket publishPacket) + { + return Task.CompletedTask; + } + + switch (publishPacket.Topic) + { + case "InjectedOne": + firstMessageEvicted = true; + break; + case "InjectedTwo": + secondMessageEvicted = true; + break; + case "InjectedThree": + thirdMessageEvicted = true; + break; + } - await receiver1.SubscribeAsync("#"); - await receiver2.SubscribeAsync("#"); + return Task.CompletedTask; + }; - await clientStatus.EnqueueApplicationMessageAsync(new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build()); + var status = await server.GetSessionsAsync(); + var clientStatus = status[0]; + await receiver.SubscribeAsync("#"); - await LongTestDelay(); + var firstMessageEnqueued = clientStatus.TryEnqueueApplicationMessage( + new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build(), out _); + await firstMessageOutboundPacketInterceptedTcs.Task; - Assert.AreEqual(1, messageReceivedHandler1.ReceivedEventArgs.Count); - Assert.AreEqual("InjectedOne", messageReceivedHandler1.ReceivedEventArgs[0].ApplicationMessage.Topic); + var secondMessageEnqueued = clientStatus.TryEnqueueApplicationMessage( + new MqttApplicationMessageBuilder().WithTopic("InjectedTwo").Build(), out _); - // The second receiver should NOT receive the message. - Assert.AreEqual(0, messageReceivedHandler2.ReceivedEventArgs.Count); - } + var thirdMessageEnqueued = clientStatus.TryEnqueueApplicationMessage( + new MqttApplicationMessageBuilder().WithTopic("InjectedThree").Build(), out _); + + // Due to the DropNewMessage strategy the third message will not be enqueued. + // As a result, no existing messages in the queue will be dropped (evicted). + Assert.IsTrue(firstMessageEnqueued); + Assert.IsTrue(secondMessageEnqueued); + Assert.IsFalse(thirdMessageEnqueued); + + Assert.IsFalse(firstMessageEvicted); + Assert.IsFalse(secondMessageEvicted); + Assert.IsFalse(thirdMessageEvicted); } + [TestMethod] - public async Task Inject_ApplicationMessage_At_Server_Level() + public async Task Enqueue_Application_Message_At_Session_Level_QueueOverflow_DropOldestQueuedMessageStrategy() { - using (var testEnvironment = CreateTestEnvironment()) + using var testEnvironment = CreateTestEnvironment(trackUnobservedTaskException: false); + + var server = await testEnvironment.StartServer( + builder => builder + .WithMaxPendingMessagesPerClient(1) + .WithPendingMessagesOverflowStrategy(MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage)); + + var receiver = await testEnvironment.ConnectClient(); + + var firstMessageOutboundPacketInterceptedTcs = new TaskCompletionSource(); + server.InterceptingOutboundPacketAsync += async args => { - var server = await testEnvironment.StartServer(); + // - The first message is dequeued normally and calls this delay + // - The second message fills the outbound queue + // - The third message overflows the outbound queue + if (args.Packet is MqttPublishPacket) + { + firstMessageOutboundPacketInterceptedTcs.SetResult(); + await Task.Delay(TimeSpan.FromDays(1), args.CancellationToken); + } + }; + + var firstMessageEvicted = false; + var secondMessageEvicted = false; + var thirdMessageEvicted = false; + + server.QueuedApplicationMessageOverwrittenAsync += args => + { + if (args.Packet is not MqttPublishPacket publishPacket) + { + return Task.CompletedTask; + } + + switch (publishPacket.Topic) + { + case "InjectedOne": + firstMessageEvicted = true; + break; + case "InjectedTwo": + secondMessageEvicted = true; + break; + case "InjectedThree": + thirdMessageEvicted = true; + break; + } - var receiver = await testEnvironment.ConnectClient(); + return Task.CompletedTask; + }; - var messageReceivedHandler = testEnvironment.CreateApplicationMessageHandler(receiver); + var status = await server.GetSessionsAsync(); + var clientStatus = status[0]; + await receiver.SubscribeAsync("#"); - await receiver.SubscribeAsync("#"); + var firstMessageEnqueued = clientStatus.TryEnqueueApplicationMessage( + new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build(), out _); + await firstMessageOutboundPacketInterceptedTcs.Task; - var injectedApplicationMessage = new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build(); + var secondMessageEnqueued = clientStatus.TryEnqueueApplicationMessage( + new MqttApplicationMessageBuilder().WithTopic("InjectedTwo").Build(), out _); - await server.InjectApplicationMessage(new InjectedMqttApplicationMessage(injectedApplicationMessage)); + var thirdMessageEnqueued = clientStatus.TryEnqueueApplicationMessage( + new MqttApplicationMessageBuilder().WithTopic("InjectedThree").Build(), out _); - await LongTestDelay(); + // Due to the DropOldestQueuedMessage strategy, all messages will be enqueued initially. + // But the second message will eventually be dropped (evicted) to make room for the third one. + Assert.IsTrue(firstMessageEnqueued); + Assert.IsTrue(secondMessageEnqueued); + Assert.IsTrue(thirdMessageEnqueued); - Assert.AreEqual(1, messageReceivedHandler.ReceivedEventArgs.Count); - Assert.AreEqual("InjectedOne", messageReceivedHandler.ReceivedEventArgs[0].ApplicationMessage.Topic); - } + Assert.IsFalse(firstMessageEvicted); + Assert.IsTrue(secondMessageEvicted); + Assert.IsFalse(thirdMessageEvicted); } [TestMethod] - public async Task Intercept_Injected_Application_Message() + public async Task Deliver_Application_Message_At_Session_Level() { - using (var testEnvironment = CreateTestEnvironment()) + using var testEnvironment = CreateTestEnvironment(); + + var server = await testEnvironment.StartServer(); + var receiver1 = await testEnvironment.ConnectClient(); + var receiver2 = await testEnvironment.ConnectClient(); + var messageReceivedHandler1 = testEnvironment.CreateApplicationMessageHandler(receiver1); + var messageReceivedHandler2 = testEnvironment.CreateApplicationMessageHandler(receiver2); + + var status = await server.GetSessionsAsync(); + var clientStatus = status[0]; + + await receiver1.SubscribeAsync("#"); + await receiver2.SubscribeAsync("#"); + + var mqttApplicationMessage = new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build(); + var publishPacket = await clientStatus.DeliverApplicationMessageAsync(mqttApplicationMessage); + + await LongTestDelay(); + + Assert.AreEqual(1, messageReceivedHandler1.ReceivedEventArgs.Count); + Assert.AreEqual(publishPacket.PacketIdentifier, messageReceivedHandler1.ReceivedEventArgs[0].PacketIdentifier); + Assert.AreEqual("InjectedOne", messageReceivedHandler1.ReceivedEventArgs[0].ApplicationMessage.Topic); + + // The second receiver should NOT receive the message. + Assert.AreEqual(0, messageReceivedHandler2.ReceivedEventArgs.Count); + } + + [TestMethod] + public async Task Deliver_Application_Message_At_Session_Level_QueueOverflow_DropNewMessageStrategy() + { + using var testEnvironment = CreateTestEnvironment(); + + var server = await testEnvironment.StartServer( + builder => builder + .WithMaxPendingMessagesPerClient(1) + .WithPendingMessagesOverflowStrategy(MqttPendingMessagesOverflowStrategy.DropNewMessage)); + + var receiver = await testEnvironment.ConnectClient(); + + var firstMessageOutboundPacketInterceptedTcs = new TaskCompletionSource(); + server.InterceptingOutboundPacketAsync += async args => { - var server = await testEnvironment.StartServer(); + // - The first message is dequeued normally and calls this delay + // - The second message fills the outbound queue + // - The third message overflows the outbound queue + if (args.Packet is MqttPublishPacket) + { + firstMessageOutboundPacketInterceptedTcs.SetResult(); + await Task.Delay(TimeSpan.FromDays(1), args.CancellationToken); + } + }; - MqttApplicationMessage interceptedMessage = null; - server.InterceptingPublishAsync += eventArgs => + var firstMessageEvicted = false; + var secondMessageEvicted = false; + var thirdMessageEvicted = false; + + server.QueuedApplicationMessageOverwrittenAsync += args => + { + if (args.Packet is not MqttPublishPacket publishPacket) { - interceptedMessage = eventArgs.ApplicationMessage; - return CompletedTask.Instance; - }; + return Task.CompletedTask; + } + + switch (publishPacket.Topic) + { + case "InjectedOne": + firstMessageEvicted = true; + break; + case "InjectedTwo": + secondMessageEvicted = true; + break; + case "InjectedThree": + thirdMessageEvicted = true; + break; + } + + return Task.CompletedTask; + }; + + var status = await server.GetSessionsAsync(); + var clientStatus = status[0]; + await receiver.SubscribeAsync("#"); + + var firstMessageTask = Task.Run( + () => clientStatus.DeliverApplicationMessageAsync( + new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build())); + await LongTestDelay(); + await firstMessageOutboundPacketInterceptedTcs.Task; + + var secondMessageTask = Task.Run( + () => clientStatus.DeliverApplicationMessageAsync( + new MqttApplicationMessageBuilder().WithTopic("InjectedTwo").Build())); + await LongTestDelay(); + + var thirdMessageTask = Task.Run( + () => clientStatus.DeliverApplicationMessageAsync( + new MqttApplicationMessageBuilder().WithTopic("InjectedThree").Build())); + await LongTestDelay(); + + Task.WaitAny(firstMessageTask, secondMessageTask, thirdMessageTask); + + // Due to the DropNewMessage strategy the third message delivery will fail. + // As a result, no existing messages in the queue will be dropped (evicted). + Assert.AreEqual(firstMessageTask.Status, TaskStatus.WaitingForActivation); + Assert.AreEqual(secondMessageTask.Status, TaskStatus.WaitingForActivation); + Assert.AreEqual(thirdMessageTask.Status, TaskStatus.Faulted); + Assert.IsTrue(thirdMessageTask.Exception?.InnerException is MqttPendingMessagesOverflowException); + + Assert.IsFalse(firstMessageEvicted); + Assert.IsFalse(secondMessageEvicted); + Assert.IsFalse(thirdMessageEvicted); + } + + [TestMethod] + public async Task Deliver_Application_Message_At_Session_Level_QueueOverflow_DropOldestQueuedMessageStrategy() + { + using var testEnvironment = CreateTestEnvironment(trackUnobservedTaskException: false); + + var server = await testEnvironment.StartServer( + builder => builder + .WithMaxPendingMessagesPerClient(1) + .WithPendingMessagesOverflowStrategy(MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage)); + + var receiver = await testEnvironment.ConnectClient(); + + var firstMessageOutboundPacketInterceptedTcs = new TaskCompletionSource(); + server.InterceptingOutboundPacketAsync += async args => + { + // - The first message is dequeued normally and calls this delay + // - The second message fills the outbound queue + // - The third message overflows the outbound queue + if (args.Packet is MqttPublishPacket) + { + firstMessageOutboundPacketInterceptedTcs.SetResult(); + await Task.Delay(TimeSpan.FromDays(1), args.CancellationToken); + } + }; + + var firstMessageEvicted = false; + var secondMessageEvicted = false; + var thirdMessageEvicted = false; + + server.QueuedApplicationMessageOverwrittenAsync += args => + { + if (args.Packet is not MqttPublishPacket publishPacket) + { + return Task.CompletedTask; + } + + switch (publishPacket.Topic) + { + case "InjectedOne": + firstMessageEvicted = true; + break; + case "InjectedTwo": + secondMessageEvicted = true; + break; + case "InjectedThree": + thirdMessageEvicted = true; + break; + } + + return Task.CompletedTask; + }; + + var status = await server.GetSessionsAsync(); + var clientStatus = status[0]; + await receiver.SubscribeAsync("#"); + + var firstMessageTask = Task.Run( + () => clientStatus.DeliverApplicationMessageAsync( + new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build())); + await LongTestDelay(); + await firstMessageOutboundPacketInterceptedTcs.Task; + + var secondMessageTask = Task.Run( + () => clientStatus.DeliverApplicationMessageAsync( + new MqttApplicationMessageBuilder().WithTopic("InjectedTwo").Build())); + await LongTestDelay(); + + var thirdMessageTask = Task.Run( + () => clientStatus.DeliverApplicationMessageAsync( + new MqttApplicationMessageBuilder().WithTopic("InjectedThree").Build())); + await LongTestDelay(); + + Task.WaitAny(firstMessageTask, secondMessageTask, thirdMessageTask); + + // Due to the DropOldestQueuedMessage strategy, the second message delivery will fail + // to make room for the third one. + Assert.AreEqual(firstMessageTask.Status, TaskStatus.WaitingForActivation); + Assert.AreEqual(secondMessageTask.Status, TaskStatus.Faulted); + Assert.IsTrue(secondMessageTask.Exception?.InnerException is MqttPendingMessagesOverflowException); + Assert.AreEqual(thirdMessageTask.Status, TaskStatus.WaitingForActivation); + + Assert.IsFalse(firstMessageEvicted); + Assert.IsTrue(secondMessageEvicted); + Assert.IsFalse(thirdMessageEvicted); + } + + [TestMethod] + public async Task Inject_ApplicationMessage_At_Server_Level() + { + using var testEnvironment = CreateTestEnvironment(); + + var server = await testEnvironment.StartServer(); + + var receiver = await testEnvironment.ConnectClient(); + + var messageReceivedHandler = testEnvironment.CreateApplicationMessageHandler(receiver); + + await receiver.SubscribeAsync("#"); + + var injectedApplicationMessage = new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build(); + + await server.InjectApplicationMessage(new InjectedMqttApplicationMessage(injectedApplicationMessage)); + + await LongTestDelay(); + + Assert.AreEqual(1, messageReceivedHandler.ReceivedEventArgs.Count); + Assert.AreEqual("InjectedOne", messageReceivedHandler.ReceivedEventArgs[0].ApplicationMessage.Topic); + } + + [TestMethod] + public async Task Intercept_Injected_Application_Message() + { + using var testEnvironment = CreateTestEnvironment(); + + var server = await testEnvironment.StartServer(); + + MqttApplicationMessage interceptedMessage = null; + server.InterceptingPublishAsync += eventArgs => + { + interceptedMessage = eventArgs.ApplicationMessage; + return CompletedTask.Instance; + }; - var injectedApplicationMessage = new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build(); - await server.InjectApplicationMessage(new InjectedMqttApplicationMessage(injectedApplicationMessage)); + var injectedApplicationMessage = new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build(); + await server.InjectApplicationMessage(new InjectedMqttApplicationMessage(injectedApplicationMessage)); - await LongTestDelay(); + await LongTestDelay(); - Assert.IsNotNull(interceptedMessage); - } + Assert.IsNotNull(interceptedMessage); } } } \ No newline at end of file From ebbdd535421615607e584344c9047008b8d38b13 Mon Sep 17 00:00:00 2001 From: Anton Smolkov Date: Mon, 2 Dec 2024 19:33:11 +0300 Subject: [PATCH 3/7] Change return type from MqttPublishPacket to MqttPacketWithIdentifier --- Source/MQTTnet.Server/Status/MqttSessionStatus.cs | 6 +++--- Source/MQTTnet.Tests/Server/Injection_Tests.cs | 11 +++++++++-- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/Source/MQTTnet.Server/Status/MqttSessionStatus.cs b/Source/MQTTnet.Server/Status/MqttSessionStatus.cs index aa866c259..2b64b632a 100644 --- a/Source/MQTTnet.Server/Status/MqttSessionStatus.cs +++ b/Source/MQTTnet.Server/Status/MqttSessionStatus.cs @@ -48,7 +48,7 @@ public Task DeleteAsync() /// /// A task that represents the asynchronous operation. The result contains the delivered MQTT publish packet. /// - public async Task DeliverApplicationMessageAsync(MqttApplicationMessage applicationMessage) + public async Task DeliverApplicationMessageAsync(MqttApplicationMessage applicationMessage) { ArgumentNullException.ThrowIfNull(applicationMessage); @@ -57,7 +57,7 @@ public async Task DeliverApplicationMessageAsync(MqttApplicat var mqttPacket = await packetBusItem.WaitAsync(); - return (MqttPublishPacket)mqttPacket; + return (MqttPacketWithIdentifier)mqttPacket; } /// @@ -72,7 +72,7 @@ public async Task DeliverApplicationMessageAsync(MqttApplicat /// However, an existing message in the queue may be dropped later to make room for the newly enqueued message. /// Such dropped messages can be tracked by subscribing to event. /// - public bool TryEnqueueApplicationMessage(MqttApplicationMessage applicationMessage, out MqttPublishPacket publishPacket) + public bool TryEnqueueApplicationMessage(MqttApplicationMessage applicationMessage, out MqttPacketWithIdentifier publishPacket) { ArgumentNullException.ThrowIfNull(applicationMessage); diff --git a/Source/MQTTnet.Tests/Server/Injection_Tests.cs b/Source/MQTTnet.Tests/Server/Injection_Tests.cs index 7ed1fc041..0bd4bb560 100644 --- a/Source/MQTTnet.Tests/Server/Injection_Tests.cs +++ b/Source/MQTTnet.Tests/Server/Injection_Tests.cs @@ -3,6 +3,7 @@ using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.Internal; using MQTTnet.Packets; +using MQTTnet.Protocol; using MQTTnet.Server; using MQTTnet.Server.Exceptions; @@ -28,7 +29,10 @@ public async Task Enqueue_Application_Message_At_Session_Level() await receiver1.SubscribeAsync("#"); await receiver2.SubscribeAsync("#"); - var message = new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build(); + var message = new MqttApplicationMessageBuilder() + .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce) + .WithTopic("InjectedOne").Build(); + var enqueued = clientStatus.TryEnqueueApplicationMessage(message, out var publishPacket); Assert.IsTrue(enqueued); @@ -215,7 +219,10 @@ public async Task Deliver_Application_Message_At_Session_Level() await receiver1.SubscribeAsync("#"); await receiver2.SubscribeAsync("#"); - var mqttApplicationMessage = new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build(); + var mqttApplicationMessage = new MqttApplicationMessageBuilder() + .WithTopic("InjectedOne") + .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce) + .Build(); var publishPacket = await clientStatus.DeliverApplicationMessageAsync(mqttApplicationMessage); await LongTestDelay(); From 026029c3d9214ddfaf9f17741f207d732356bac5 Mon Sep 17 00:00:00 2001 From: Anton Smolkov Date: Tue, 3 Dec 2024 16:37:48 +0300 Subject: [PATCH 4/7] Use dedicated type for MattApplicationMessage injection result --- .../Status/MqttSessionStatus.cs | 27 ++++++++++++------- .../MQTTnet.Tests/Server/Injection_Tests.cs | 8 +++--- .../InjectMqttApplicationMessageResult.cs | 10 +++++++ 3 files changed, 31 insertions(+), 14 deletions(-) create mode 100644 Source/MQTTnet/InjectMqttApplicationMessageResult.cs diff --git a/Source/MQTTnet.Server/Status/MqttSessionStatus.cs b/Source/MQTTnet.Server/Status/MqttSessionStatus.cs index 2b64b632a..251dfa8e2 100644 --- a/Source/MQTTnet.Server/Status/MqttSessionStatus.cs +++ b/Source/MQTTnet.Server/Status/MqttSessionStatus.cs @@ -46,9 +46,10 @@ public Task DeleteAsync() /// /// The application message to deliver. /// - /// A task that represents the asynchronous operation. The result contains the delivered MQTT publish packet. + /// A task that represents the asynchronous operation. + /// The result contains the that includes the packet identifier of the enqueued message. /// - public async Task DeliverApplicationMessageAsync(MqttApplicationMessage applicationMessage) + public async Task DeliverApplicationMessageAsync(MqttApplicationMessage applicationMessage) { ArgumentNullException.ThrowIfNull(applicationMessage); @@ -57,14 +58,19 @@ public async Task DeliverApplicationMessageAsync(MqttA var mqttPacket = await packetBusItem.WaitAsync(); - return (MqttPacketWithIdentifier)mqttPacket; + var result = new InjectMqttApplicationMessageResult() + { + PacketIdentifier = ((MqttPacketWithIdentifier)mqttPacket).PacketIdentifier + }; + + return result; } /// /// Attempts to enqueue an application message to the session's send buffer. /// /// The application message to enqueue. - /// The corresponding MQTT publish packed, if the operation was successful. + /// that includes the packet identifier of the enqueued message. /// true if the message was successfully enqueued; otherwise, false. /// /// When is set to , @@ -72,20 +78,21 @@ public async Task DeliverApplicationMessageAsync(MqttA /// However, an existing message in the queue may be dropped later to make room for the newly enqueued message. /// Such dropped messages can be tracked by subscribing to event. /// - public bool TryEnqueueApplicationMessage(MqttApplicationMessage applicationMessage, out MqttPacketWithIdentifier publishPacket) + public bool TryEnqueueApplicationMessage(MqttApplicationMessage applicationMessage, out InjectMqttApplicationMessageResult injectResult) { ArgumentNullException.ThrowIfNull(applicationMessage); - publishPacket = MqttPublishPacketFactory.Create(applicationMessage); + var publishPacket = MqttPublishPacketFactory.Create(applicationMessage); var enqueueDataPacketResult = _session.EnqueueDataPacket(new MqttPacketBusItem(publishPacket)); - if (enqueueDataPacketResult == EnqueueDataPacketResult.Enqueued) + if (enqueueDataPacketResult != EnqueueDataPacketResult.Enqueued) { - return true; + injectResult = null; + return false; } - publishPacket = null; - return false; + injectResult = new InjectMqttApplicationMessageResult() { PacketIdentifier = publishPacket.PacketIdentifier }; + return true; } [Obsolete("This method is obsolete. Use TryEnqueueApplicationMessage instead.")] diff --git a/Source/MQTTnet.Tests/Server/Injection_Tests.cs b/Source/MQTTnet.Tests/Server/Injection_Tests.cs index 0bd4bb560..85ac53414 100644 --- a/Source/MQTTnet.Tests/Server/Injection_Tests.cs +++ b/Source/MQTTnet.Tests/Server/Injection_Tests.cs @@ -33,14 +33,14 @@ public async Task Enqueue_Application_Message_At_Session_Level() .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce) .WithTopic("InjectedOne").Build(); - var enqueued = clientStatus.TryEnqueueApplicationMessage(message, out var publishPacket); + var enqueued = clientStatus.TryEnqueueApplicationMessage(message, out var injectResult); Assert.IsTrue(enqueued); await LongTestDelay(); Assert.AreEqual(1, messageReceivedHandler1.ReceivedEventArgs.Count); - Assert.AreEqual(publishPacket.PacketIdentifier, messageReceivedHandler1.ReceivedEventArgs[0].PacketIdentifier); + Assert.AreEqual(injectResult.PacketIdentifier, messageReceivedHandler1.ReceivedEventArgs[0].PacketIdentifier); Assert.AreEqual("InjectedOne", messageReceivedHandler1.ReceivedEventArgs[0].ApplicationMessage.Topic); // The second receiver should NOT receive the message. @@ -223,12 +223,12 @@ public async Task Deliver_Application_Message_At_Session_Level() .WithTopic("InjectedOne") .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce) .Build(); - var publishPacket = await clientStatus.DeliverApplicationMessageAsync(mqttApplicationMessage); + var injectResult = await clientStatus.DeliverApplicationMessageAsync(mqttApplicationMessage); await LongTestDelay(); Assert.AreEqual(1, messageReceivedHandler1.ReceivedEventArgs.Count); - Assert.AreEqual(publishPacket.PacketIdentifier, messageReceivedHandler1.ReceivedEventArgs[0].PacketIdentifier); + Assert.AreEqual(injectResult.PacketIdentifier, messageReceivedHandler1.ReceivedEventArgs[0].PacketIdentifier); Assert.AreEqual("InjectedOne", messageReceivedHandler1.ReceivedEventArgs[0].ApplicationMessage.Topic); // The second receiver should NOT receive the message. diff --git a/Source/MQTTnet/InjectMqttApplicationMessageResult.cs b/Source/MQTTnet/InjectMqttApplicationMessageResult.cs new file mode 100644 index 000000000..b2e6da802 --- /dev/null +++ b/Source/MQTTnet/InjectMqttApplicationMessageResult.cs @@ -0,0 +1,10 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +namespace MQTTnet; + +public class InjectMqttApplicationMessageResult +{ + public ushort PacketIdentifier { get; init; } +} \ No newline at end of file From 8061ad601a7591328260c717156113a13bde844d Mon Sep 17 00:00:00 2001 From: Anton Smolkov Date: Tue, 3 Dec 2024 17:46:10 +0300 Subject: [PATCH 5/7] Code review minor fixes --- Source/MQTTnet.Server/Status/MqttSessionStatus.cs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/Source/MQTTnet.Server/Status/MqttSessionStatus.cs b/Source/MQTTnet.Server/Status/MqttSessionStatus.cs index 251dfa8e2..780c08130 100644 --- a/Source/MQTTnet.Server/Status/MqttSessionStatus.cs +++ b/Source/MQTTnet.Server/Status/MqttSessionStatus.cs @@ -4,7 +4,6 @@ using System.Collections; using MQTTnet.Internal; -using MQTTnet.Packets; using MQTTnet.Server.Internal; using MQTTnet.Server.Internal.Formatter; @@ -53,17 +52,18 @@ public async Task DeliverApplicationMessageA { ArgumentNullException.ThrowIfNull(applicationMessage); - var packetBusItem = new MqttPacketBusItem(MqttPublishPacketFactory.Create(applicationMessage)); + var publishPacket = MqttPublishPacketFactory.Create(applicationMessage); + var packetBusItem = new MqttPacketBusItem(publishPacket); _session.EnqueueDataPacket(packetBusItem); - var mqttPacket = await packetBusItem.WaitAsync(); + await packetBusItem.WaitAsync().ConfigureAwait(false); - var result = new InjectMqttApplicationMessageResult() + var injectResult = new InjectMqttApplicationMessageResult() { - PacketIdentifier = ((MqttPacketWithIdentifier)mqttPacket).PacketIdentifier + PacketIdentifier = publishPacket.PacketIdentifier }; - return result; + return injectResult; } /// From 8a1c4c13663b4aec513a63b309ce0d8126a55a26 Mon Sep 17 00:00:00 2001 From: christian <6939810+chkr1011@users.noreply.github.com> Date: Tue, 10 Dec 2024 19:53:01 +0100 Subject: [PATCH 6/7] Apply project code style --- .../MqttPendingMessagesOverflowException.cs | 22 +++++++++---------- Source/MQTTnet.Server/Internal/MqttSession.cs | 1 + 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/Source/MQTTnet.Server/Exceptions/MqttPendingMessagesOverflowException.cs b/Source/MQTTnet.Server/Exceptions/MqttPendingMessagesOverflowException.cs index d17708240..0c045bc8c 100644 --- a/Source/MQTTnet.Server/Exceptions/MqttPendingMessagesOverflowException.cs +++ b/Source/MQTTnet.Server/Exceptions/MqttPendingMessagesOverflowException.cs @@ -2,18 +2,18 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. -namespace MQTTnet.Server.Exceptions +namespace MQTTnet.Server.Exceptions; + +public class MqttPendingMessagesOverflowException : Exception { - public class MqttPendingMessagesOverflowException : Exception + public MqttPendingMessagesOverflowException(string sessionId, MqttPendingMessagesOverflowStrategy overflowStrategy) : base( + $"Send buffer max pending messages overflow occurred for session '{sessionId}'. Strategy: {overflowStrategy}.") { - public string SessionId { get; } - public MqttPendingMessagesOverflowStrategy OverflowStrategy { get; } - - public MqttPendingMessagesOverflowException(string sessionId, MqttPendingMessagesOverflowStrategy overflowStrategy) - : base($"Send buffer max pending messages overflow occurred for session '{sessionId}'. Strategy: {overflowStrategy}.") - { - SessionId = sessionId; - OverflowStrategy = overflowStrategy; - } + SessionId = sessionId; + OverflowStrategy = overflowStrategy; } + + public MqttPendingMessagesOverflowStrategy OverflowStrategy { get; } + + public string SessionId { get; } } \ No newline at end of file diff --git a/Source/MQTTnet.Server/Internal/MqttSession.cs b/Source/MQTTnet.Server/Internal/MqttSession.cs index 30fa0b84d..329b285d5 100644 --- a/Source/MQTTnet.Server/Internal/MqttSession.cs +++ b/Source/MQTTnet.Server/Internal/MqttSession.cs @@ -126,6 +126,7 @@ public EnqueueDataPacketResult EnqueueDataPacket(MqttPacketBusItem packetBusItem if (firstItem != null) { firstItem.Fail(new MqttPendingMessagesOverflowException(Id, _serverOptions.PendingMessagesOverflowStrategy)); + if (_eventContainer.QueuedApplicationMessageOverwrittenEvent.HasHandlers) { var eventArgs = new QueueMessageOverwrittenEventArgs(Id, firstItem.Packet); From cf0a45262191a0b66ddf629f95276feb17840dcf Mon Sep 17 00:00:00 2001 From: christian <6939810+chkr1011@users.noreply.github.com> Date: Tue, 10 Dec 2024 19:56:43 +0100 Subject: [PATCH 7/7] Update release notes --- Source/ReleaseNotes.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Source/ReleaseNotes.md b/Source/ReleaseNotes.md index 47fc8a553..b84fa761a 100644 --- a/Source/ReleaseNotes.md +++ b/Source/ReleaseNotes.md @@ -8,5 +8,7 @@ * Namespace changes **(BREAKING CHANGE)** * Removal of Managed Client **(BREAKING CHANGE)** * Client: MQTT 5.0.0 is now the default version when connecting with a server **(BREAKING CHANGE)** +* Client: TryPrivate is no longer enabled by default **(BREAKING CHANGE)** * Server: Set default for "MaxPendingMessagesPerClient" to 1000 **(BREAKING CHANGE)** -* Server: Set SSL version to "None" which will let the OS choose the version **(BREAKING CHANGE)** \ No newline at end of file +* Server: Set SSL version to "None" which will let the OS choose the version **(BREAKING CHANGE)** +* Server: Fixed dead lock when awaiting a packet transmission but the packet gets dropped due to quotas (#2117, thanks to @AntonSmolkov) \ No newline at end of file