From fad95e1cc804be49c2fff2ddcd22a4140895e10e Mon Sep 17 00:00:00 2001 From: Jeff Scherrer Date: Fri, 30 Dec 2022 12:23:01 -0800 Subject: [PATCH 1/3] Added server extensibility interface. --- .../Server/Internal/IMqttServerExtensibility.cs | 16 ++++++++++++++++ Source/MQTTnet/Server/MqttServer.cs | 7 ++++++- 2 files changed, 22 insertions(+), 1 deletion(-) create mode 100644 Source/MQTTnet/Server/Internal/IMqttServerExtensibility.cs diff --git a/Source/MQTTnet/Server/Internal/IMqttServerExtensibility.cs b/Source/MQTTnet/Server/Internal/IMqttServerExtensibility.cs new file mode 100644 index 000000000..790f0480c --- /dev/null +++ b/Source/MQTTnet/Server/Internal/IMqttServerExtensibility.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections; +using System.Collections.Generic; +using System.Text; + +namespace MQTTnet.Server +{ + public interface IMqttServerExtensibility + { + + MqttClientSessionsManager MqttClientSessionsManager { get; } + + IDictionary SessionItems { get; } + + } +} diff --git a/Source/MQTTnet/Server/MqttServer.cs b/Source/MQTTnet/Server/MqttServer.cs index 6568f0b56..e30cfca06 100644 --- a/Source/MQTTnet/Server/MqttServer.cs +++ b/Source/MQTTnet/Server/MqttServer.cs @@ -17,7 +17,7 @@ namespace MQTTnet.Server { - public class MqttServer : Disposable + public class MqttServer : Disposable, IMqttServerExtensibility { readonly ICollection _adapters; readonly MqttClientSessionsManager _clientSessionsManager; @@ -167,6 +167,11 @@ public event Func ValidatingConnectionAsync public bool IsStarted => _cancellationTokenSource != null; + MqttClientSessionsManager IMqttServerExtensibility.MqttClientSessionsManager => _clientSessionsManager; + + IDictionary IMqttServerExtensibility.SessionItems => _sessionItems; + + public Task DeleteRetainedMessagesAsync() { ThrowIfNotStarted(); From 9c49b594dc28a9bf9c27fac89cec02e32ce9dd35 Mon Sep 17 00:00:00 2001 From: Jeff Scherrer Date: Sat, 31 Dec 2022 16:53:33 -0800 Subject: [PATCH 2/3] Added dispatch to client method. --- .../MQTTnet.Tests/Server/Injection_Tests.cs | 25 ++++ .../Internal/MqttClientSessionsManager.cs | 126 ++++++++++++++++++ 2 files changed, 151 insertions(+) diff --git a/Source/MQTTnet.Tests/Server/Injection_Tests.cs b/Source/MQTTnet.Tests/Server/Injection_Tests.cs index 14d30026e..01ce8048d 100644 --- a/Source/MQTTnet.Tests/Server/Injection_Tests.cs +++ b/Source/MQTTnet.Tests/Server/Injection_Tests.cs @@ -38,6 +38,31 @@ public async Task Inject_Application_Message_At_Session_Level() } } + [TestMethod] + public async Task Inject_ApplicationMessage_To_Client_At_Server_Level() + { + using (var testEnvironment = CreateTestEnvironment()) + { + var server = await testEnvironment.StartServer(); + + var receiver = await testEnvironment.ConnectClient(); + + var messageReceivedHandler = testEnvironment.CreateApplicationMessageHandler(receiver); + + await receiver.SubscribeAsync("#"); + + var injectedApplicationMessage = new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build(); + + var serverEx = (IMqttServerExtensibility)server; + await serverEx.MqttClientSessionsManager.DispatchApplicationMessageToClient(receiver.Options.ClientId, "InjectionSender", serverEx.SessionItems, injectedApplicationMessage, default); + + await LongTestDelay(); + + Assert.AreEqual(1, messageReceivedHandler.ReceivedEventArgs.Count); + Assert.AreEqual("InjectedOne", messageReceivedHandler.ReceivedEventArgs[0].ApplicationMessage.Topic); + } + } + [TestMethod] public async Task Inject_ApplicationMessage_At_Server_Level() { diff --git a/Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs index b76380ab8..19618db22 100644 --- a/Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs +++ b/Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs @@ -240,6 +240,132 @@ public async Task DispatchApplicationMessage( return new DispatchApplicationMessageResult(reasonCode, closeConnection, reasonString, userProperties); } + public async Task DispatchApplicationMessageToClient( + string clientId, + string senderId, + IDictionary senderSessionItems, + MqttApplicationMessage applicationMessage, + CancellationToken cancellationToken) + { + var processPublish = true; + var closeConnection = false; + string reasonString = null; + List userProperties = null; + var reasonCode = 0; // The reason code is later converted into several different but compatible enums! + + // Allow the user to intercept application message... + if (_eventContainer.InterceptingPublishEvent.HasHandlers) + { + var interceptingPublishEventArgs = new InterceptingPublishEventArgs(applicationMessage, cancellationToken, senderId, senderSessionItems); + if (string.IsNullOrEmpty(interceptingPublishEventArgs.ApplicationMessage.Topic)) + { + // This can happen if a topic alias us used but the topic is + // unknown to the server. + interceptingPublishEventArgs.Response.ReasonCode = MqttPubAckReasonCode.TopicNameInvalid; + interceptingPublishEventArgs.ProcessPublish = false; + } + + await _eventContainer.InterceptingPublishEvent.InvokeAsync(interceptingPublishEventArgs).ConfigureAwait(false); + + applicationMessage = interceptingPublishEventArgs.ApplicationMessage; + closeConnection = interceptingPublishEventArgs.CloseConnection; + processPublish = interceptingPublishEventArgs.ProcessPublish; + reasonString = interceptingPublishEventArgs.Response.ReasonString; + userProperties = interceptingPublishEventArgs.Response.UserProperties; + reasonCode = (int)interceptingPublishEventArgs.Response.ReasonCode; + } + + // Process the application message... + if (processPublish && applicationMessage != null) + { + try + { + if (applicationMessage.Retain) + { + await _retainedMessagesManager.UpdateMessage(senderId, applicationMessage).ConfigureAwait(false); + } + + MqttSession session; + bool foundSession; + lock (_sessionsManagementLock) + { + foundSession = _sessions.TryGetValue(clientId, out session); + } + if (!foundSession) + { + await FireApplicationMessageNotConsumedEvent(applicationMessage, senderId).ConfigureAwait(false); + + return new DispatchApplicationMessageResult( + (int)MqttPubAckReasonCode.NoMatchingSubscribers, + false, + reasonString, + userProperties); + } + + // Calculate application message topic hash once for subscription checks + MqttSubscription.CalculateTopicHash(applicationMessage.Topic, out var topicHash, out _, out _); + + if (!session.TryCheckSubscriptions( + applicationMessage.Topic, + topicHash, + applicationMessage.QualityOfServiceLevel, + senderId, + out var checkSubscriptionsResult)) + { + await FireApplicationMessageNotConsumedEvent(applicationMessage, senderId).ConfigureAwait(false); + + // Checking the subscriptions has failed for the session. The session + // will be ignored. + return new DispatchApplicationMessageResult( + (int)MqttPubAckReasonCode.NoMatchingSubscribers, + false, + reasonString, + userProperties); + } + + if (!checkSubscriptionsResult.IsSubscribed) + { + await FireApplicationMessageNotConsumedEvent(applicationMessage, senderId).ConfigureAwait(false); + + return new DispatchApplicationMessageResult( + (int)MqttPubAckReasonCode.NoMatchingSubscribers, + false, + reasonString, + userProperties); + } + + var publishPacketCopy = MqttPacketFactories.Publish.Create(applicationMessage); + publishPacketCopy.QualityOfServiceLevel = checkSubscriptionsResult.QualityOfServiceLevel; + publishPacketCopy.SubscriptionIdentifiers = checkSubscriptionsResult.SubscriptionIdentifiers; + + if (publishPacketCopy.QualityOfServiceLevel > 0) + { + publishPacketCopy.PacketIdentifier = session.PacketIdentifierProvider.GetNextPacketIdentifier(); + } + + if (checkSubscriptionsResult.RetainAsPublished) + { + // Transfer the original retain state from the publisher. This is a MQTTv5 feature. + publishPacketCopy.Retain = applicationMessage.Retain; + } + else + { + publishPacketCopy.Retain = false; + } + + session.EnqueueDataPacket(new MqttPacketBusItem(publishPacketCopy)); + + _logger.Verbose("Client '{0}': Queued PUBLISH packet with topic '{1}'.", session.Id, applicationMessage.Topic); + } + catch (Exception exception) + { + _logger.Error(exception, "Unhandled exception while processing next queued application message."); + } + } + + return new DispatchApplicationMessageResult(reasonCode, closeConnection, reasonString, userProperties); + } + public void Dispose() { _createConnectionSyncRoot.Dispose(); From 6ec7f768f79b0ef600553dcb2eeae04e04c473e3 Mon Sep 17 00:00:00 2001 From: Jeffery Michael <37258233+YAJeff@users.noreply.github.com> Date: Sat, 21 Jan 2023 08:42:30 -0800 Subject: [PATCH 3/3] Updated to latest changes. --- Source/MQTTnet.Tests/Server/Injection_Tests.cs | 2 +- Source/MQTTnet/Server/Internal/IMqttServerExtensibility.cs | 2 -- Source/MQTTnet/Server/MqttServer.cs | 2 +- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/Source/MQTTnet.Tests/Server/Injection_Tests.cs b/Source/MQTTnet.Tests/Server/Injection_Tests.cs index 01ce8048d..b1fd0b6db 100644 --- a/Source/MQTTnet.Tests/Server/Injection_Tests.cs +++ b/Source/MQTTnet.Tests/Server/Injection_Tests.cs @@ -54,7 +54,7 @@ public async Task Inject_ApplicationMessage_To_Client_At_Server_Level() var injectedApplicationMessage = new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build(); var serverEx = (IMqttServerExtensibility)server; - await serverEx.MqttClientSessionsManager.DispatchApplicationMessageToClient(receiver.Options.ClientId, "InjectionSender", serverEx.SessionItems, injectedApplicationMessage, default); + await serverEx.MqttClientSessionsManager.DispatchApplicationMessageToClient(receiver.Options.ClientId, "InjectionSender", server.ServerSessionItems, injectedApplicationMessage, default); await LongTestDelay(); diff --git a/Source/MQTTnet/Server/Internal/IMqttServerExtensibility.cs b/Source/MQTTnet/Server/Internal/IMqttServerExtensibility.cs index 790f0480c..30b66a5e8 100644 --- a/Source/MQTTnet/Server/Internal/IMqttServerExtensibility.cs +++ b/Source/MQTTnet/Server/Internal/IMqttServerExtensibility.cs @@ -10,7 +10,5 @@ public interface IMqttServerExtensibility MqttClientSessionsManager MqttClientSessionsManager { get; } - IDictionary SessionItems { get; } - } } diff --git a/Source/MQTTnet/Server/MqttServer.cs b/Source/MQTTnet/Server/MqttServer.cs index 038ab4673..e1a659661 100644 --- a/Source/MQTTnet/Server/MqttServer.cs +++ b/Source/MQTTnet/Server/MqttServer.cs @@ -234,7 +234,7 @@ public Task InjectApplicationMessage(InjectedMqttApplicationMessage injectedAppl throw new NotSupportedException("Injected application messages must contain a topic. Topic alias is not supported."); } - var sessionItems = injectedApplicationMessage.CustomSessionItems ?? ServerSessionItems; + var sessionItems = ServerSessionItems; return _clientSessionsManager.DispatchApplicationMessage( injectedApplicationMessage.SenderClientId,