Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retained message store extendable #1663

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Source/MQTTnet/Formatter/MqttSubAckPacketFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

using System;
using MQTTnet.Packets;
using MQTTnet.Server;
using MQTTnet.Server.Internal;

namespace MQTTnet.Formatter
{
Expand Down
25 changes: 25 additions & 0 deletions Source/MQTTnet/Server/IMqttRetainedMessagesManager.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace MQTTnet.Server
{
public interface IMqttRetainedMessagesManager
{
Task ClearMessages();

Task<IList<MqttApplicationMessage>> GetMessages(CancellationToken cancellationToken = default);

Task LoadMessages(IEnumerable<SubscriptionRetainedMessagesResult> subscriptions, CancellationToken cancellationToken = default);

Task Start();

Task Stop();

Task UpdateMessage(string clientId, MqttApplicationMessage applicationMessage);
}
}
4 changes: 2 additions & 2 deletions Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public sealed class MqttClientSessionsManager : ISubscriptionChangedNotification
readonly MqttNetSourceLogger _logger;
readonly MqttServerOptions _options;

readonly MqttRetainedMessagesManager _retainedMessagesManager;
readonly IMqttRetainedMessagesManager _retainedMessagesManager;
readonly IMqttNetLogger _rootLogger;

// The _sessions dictionary contains all session, the _subscriberSessions hash set contains subscriber sessions only.
Expand All @@ -41,7 +41,7 @@ public sealed class MqttClientSessionsManager : ISubscriptionChangedNotification

public MqttClientSessionsManager(
MqttServerOptions options,
MqttRetainedMessagesManager retainedMessagesManager,
IMqttRetainedMessagesManager retainedMessagesManager,
MqttServerEventContainer eventContainer,
IMqttNetLogger logger)
{
Expand Down
155 changes: 56 additions & 99 deletions Source/MQTTnet/Server/Internal/MqttClientSubscriptionsManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,15 @@
using MQTTnet.Internal;
using MQTTnet.Packets;
using MQTTnet.Protocol;
using MQTTnet.Server.Internal;

namespace MQTTnet.Server
{
public sealed class MqttClientSubscriptionsManager : IDisposable
{
static readonly List<uint> EmptySubscriptionIdentifiers = new List<uint>();

readonly MqttServerEventContainer _eventContainer;
readonly Dictionary<ulong, HashSet<MqttSubscription>> _noWildcardSubscriptionsByTopicHash = new Dictionary<ulong, HashSet<MqttSubscription>>();
readonly MqttRetainedMessagesManager _retainedMessagesManager;
readonly IMqttRetainedMessagesManager _retainedMessagesManager;

readonly MqttSession _session;

Expand All @@ -37,7 +36,7 @@ public sealed class MqttClientSubscriptionsManager : IDisposable
public MqttClientSubscriptionsManager(
MqttSession session,
MqttServerEventContainer eventContainer,
MqttRetainedMessagesManager retainedMessagesManager,
IMqttRetainedMessagesManager retainedMessagesManager,
ISubscriptionChangedNotification subscriptionChangedNotification)
{
_session = session ?? throw new ArgumentNullException(nameof(session));
Expand Down Expand Up @@ -85,7 +84,7 @@ public CheckSubscriptionsResult CheckSubscriptions(string topic, ulong topicHash
var senderIsReceiver = string.Equals(senderId, _session.Id);
var maxQoSLevel = -1; // Not subscribed.

HashSet<uint> subscriptionIdentifiers = null;
var subscriptionIdentifiers = new HashSet<uint>();
var retainAsPublished = false;

foreach (var subscription in possibleSubscriptions)
Expand Down Expand Up @@ -114,11 +113,6 @@ public CheckSubscriptionsResult CheckSubscriptions(string topic, ulong topicHash

if (subscription.Identifier > 0)
{
if (subscriptionIdentifiers == null)
{
subscriptionIdentifiers = new HashSet<uint>();
}

subscriptionIdentifiers.Add(subscription.Identifier);
}
}
Expand All @@ -132,7 +126,7 @@ public CheckSubscriptionsResult CheckSubscriptions(string topic, ulong topicHash
{
IsSubscribed = true,
RetainAsPublished = retainAsPublished,
SubscriptionIdentifiers = subscriptionIdentifiers?.ToList() ?? EmptySubscriptionIdentifiers,
SubscriptionIdentifiers = subscriptionIdentifiers.ToList(),

// Start with the same QoS as the publisher.
QualityOfServiceLevel = qualityOfServiceLevel
Expand Down Expand Up @@ -167,12 +161,11 @@ public async Task<SubscribeResult> Subscribe(MqttSubscribePacket subscribePacket
throw new ArgumentNullException(nameof(subscribePacket));
}

var retainedApplicationMessages = await _retainedMessagesManager.GetMessages().ConfigureAwait(false);
var result = new SubscribeResult(subscribePacket.TopicFilters.Count);

var addedSubscriptions = new List<string>();
var finalTopicFilters = new List<MqttTopicFilter>();

var addedSubscriptions = new List<string>(subscribePacket.TopicFilters.Count);
var finalTopicFilters = new List<MqttTopicFilter>(subscribePacket.TopicFilters.Count);
var subscriptionRetainedMessages = new List<SubscriptionRetainedMessagesResult>();
// The topic filters are order by its QoS so that the higher QoS will win over a
// lower one.
foreach (var topicFilterItem in subscribePacket.TopicFilters.OrderByDescending(f => f.QualityOfServiceLevel))
Expand All @@ -197,12 +190,46 @@ public async Task<SubscribeResult> Subscribe(MqttSubscribePacket subscribePacket
continue;
}

var createSubscriptionResult = CreateSubscription(topicFilter, subscribePacket.SubscriptionIdentifier, subscriptionEventArgs.Response.ReasonCode);
var qualtityOfService = SubscribeReasonCodeToQualityOfServiceLevel(subscriptionEventArgs.Response.ReasonCode);
var createSubscriptionResult = CreateSubscription(topicFilter, subscribePacket.SubscriptionIdentifier, qualtityOfService);

addedSubscriptions.Add(topicFilter.Topic);
finalTopicFilters.Add(topicFilter);

FilterRetainedApplicationMessages(retainedApplicationMessages, createSubscriptionResult, result);
if (createSubscriptionResult.Subscription.RetainHandling == MqttRetainHandling.DoNotSendOnSubscribe)
{
// This is a MQTT V5+ feature.
continue;
}

if (createSubscriptionResult.Subscription.RetainHandling == MqttRetainHandling.SendAtSubscribeIfNewSubscriptionOnly && !createSubscriptionResult.IsNewSubscription)
{
// This is a MQTT V5+ feature.
continue;
}

subscriptionRetainedMessages.Add(createSubscriptionResult);
}

await _retainedMessagesManager.LoadMessages(subscriptionRetainedMessages, cancellationToken);

foreach (var subscriptionRetainedMessage in subscriptionRetainedMessages)
{
foreach (var retainedMessage in subscriptionRetainedMessage.RetainedMessages)
{
var retainedMessageMatch = new MqttRetainedMessageMatch(retainedMessage, subscriptionRetainedMessage.Subscription.GrantedQualityOfServiceLevel);
if (retainedMessageMatch.SubscriptionQualityOfServiceLevel > retainedMessageMatch.ApplicationMessage.QualityOfServiceLevel)
{
// UPGRADING the QoS is not allowed!
// From MQTT spec: Subscribing to a Topic Filter at QoS 2 is equivalent to saying
// "I would like to receive Messages matching this filter at the QoS with which they were published".
// This means a publisher is responsible for determining the maximum QoS a Message can be delivered at,
// but a subscriber is able to require that the Server downgrades the QoS to one more suitable for its usage.
retainedMessageMatch.SubscriptionQualityOfServiceLevel = retainedMessageMatch.ApplicationMessage.QualityOfServiceLevel;
}

result.RetainedMessages.Add(retainedMessageMatch);
}
}

// This call will add the new subscription to the internal storage.
Expand Down Expand Up @@ -307,27 +334,8 @@ public async Task<UnsubscribeResult> Unsubscribe(MqttUnsubscribePacket unsubscri
return result;
}

CreateSubscriptionResult CreateSubscription(MqttTopicFilter topicFilter, uint subscriptionIdentifier, MqttSubscribeReasonCode reasonCode)
SubscriptionRetainedMessagesResult CreateSubscription(MqttTopicFilter topicFilter, uint subscriptionIdentifier, MqttQualityOfServiceLevel grantedQualityOfServiceLevel)
{
MqttQualityOfServiceLevel grantedQualityOfServiceLevel;

if (reasonCode == MqttSubscribeReasonCode.GrantedQoS0)
{
grantedQualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce;
}
else if (reasonCode == MqttSubscribeReasonCode.GrantedQoS1)
{
grantedQualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce;
}
else if (reasonCode == MqttSubscribeReasonCode.GrantedQoS2)
{
grantedQualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce;
}
else
{
throw new InvalidOperationException();
}

var subscription = new MqttSubscription(
topicFilter.Topic,
topicFilter.NoLocal,
Expand All @@ -339,7 +347,6 @@ CreateSubscriptionResult CreateSubscription(MqttTopicFilter topicFilter, uint su
bool isNewSubscription;

// Add to subscriptions and maintain topic hash dictionaries

using (_subscriptionsLock.EnterAsync(CancellationToken.None).GetAwaiter().GetResult())
{
MqttSubscription.CalculateTopicHash(topicFilter.Topic, out var topicHash, out var topicHashMask, out var hasWildcard);
Expand Down Expand Up @@ -391,64 +398,21 @@ CreateSubscriptionResult CreateSubscription(MqttTopicFilter topicFilter, uint su
}
}

return new CreateSubscriptionResult
{
IsNewSubscription = isNewSubscription,
Subscription = subscription
};
return new SubscriptionRetainedMessagesResult(subscription, isNewSubscription);
}

static void FilterRetainedApplicationMessages(
IList<MqttApplicationMessage> retainedMessages,
CreateSubscriptionResult createSubscriptionResult,
SubscribeResult subscribeResult)
private static MqttQualityOfServiceLevel SubscribeReasonCodeToQualityOfServiceLevel(MqttSubscribeReasonCode reasonCode)
{
for (var index = retainedMessages.Count - 1; index >= 0; index--)
switch (reasonCode)
{
var retainedMessage = retainedMessages[index];
if (retainedMessage == null)
{
continue;
}

if (createSubscriptionResult.Subscription.RetainHandling == MqttRetainHandling.DoNotSendOnSubscribe)
{
// This is a MQTT V5+ feature.
continue;
}

if (createSubscriptionResult.Subscription.RetainHandling == MqttRetainHandling.SendAtSubscribeIfNewSubscriptionOnly && !createSubscriptionResult.IsNewSubscription)
{
// This is a MQTT V5+ feature.
continue;
}

if (MqttTopicFilterComparer.Compare(retainedMessage.Topic, createSubscriptionResult.Subscription.Topic) != MqttTopicFilterCompareResult.IsMatch)
{
continue;
}

var retainedMessageMatch = new MqttRetainedMessageMatch(retainedMessage, createSubscriptionResult.Subscription.GrantedQualityOfServiceLevel);
if (retainedMessageMatch.SubscriptionQualityOfServiceLevel > retainedMessageMatch.ApplicationMessage.QualityOfServiceLevel)
{
// UPGRADING the QoS is not allowed!
// From MQTT spec: Subscribing to a Topic Filter at QoS 2 is equivalent to saying
// "I would like to receive Messages matching this filter at the QoS with which they were published".
// This means a publisher is responsible for determining the maximum QoS a Message can be delivered at,
// but a subscriber is able to require that the Server downgrades the QoS to one more suitable for its usage.
retainedMessageMatch.SubscriptionQualityOfServiceLevel = retainedMessageMatch.ApplicationMessage.QualityOfServiceLevel;
}

if (subscribeResult.RetainedMessages == null)
{
subscribeResult.RetainedMessages = new List<MqttRetainedMessageMatch>();
}

subscribeResult.RetainedMessages.Add(retainedMessageMatch);

// Clear the retained message from the list because the client should receive every message only
// one time even if multiple subscriptions affect them.
retainedMessages[index] = null;
case MqttSubscribeReasonCode.GrantedQoS0:
return MqttQualityOfServiceLevel.AtMostOnce;
case MqttSubscribeReasonCode.GrantedQoS1:
return MqttQualityOfServiceLevel.AtLeastOnce;
case MqttSubscribeReasonCode.GrantedQoS2:
return MqttQualityOfServiceLevel.ExactlyOnce;
default:
throw new InvalidOperationException();
}
}

Expand Down Expand Up @@ -495,12 +459,5 @@ async Task<InterceptingUnsubscriptionEventArgs> InterceptUnsubscribe(string topi

return clientUnsubscribingTopicEventArgs;
}

sealed class CreateSubscriptionResult
{
public bool IsNewSubscription { get; set; }

public MqttSubscription Subscription { get; set; }
}
}
}
36 changes: 34 additions & 2 deletions Source/MQTTnet/Server/Internal/MqttRetainedMessagesManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Diagnostics;
using MQTTnet.Internal;

namespace MQTTnet.Server
{
public sealed class MqttRetainedMessagesManager
public sealed class MqttRetainedMessagesManager: IMqttRetainedMessagesManager
{
readonly Dictionary<string, MqttApplicationMessage> _messages = new Dictionary<string, MqttApplicationMessage>(4096);
readonly AsyncLock _storageAccessLock = new AsyncLock();
Expand Down Expand Up @@ -53,6 +54,16 @@ public async Task Start()
}
}

public Task Stop()
{
#if NET461_OR_GREATER
return Task.CompletedTask;
#else
return Task.FromResult(0);
#endif

}

public async Task UpdateMessage(string clientId, MqttApplicationMessage applicationMessage)
{
if (applicationMessage == null)
Expand Down Expand Up @@ -114,7 +125,7 @@ public async Task UpdateMessage(string clientId, MqttApplicationMessage applicat
}
}

public Task<IList<MqttApplicationMessage>> GetMessages()
public Task<IList<MqttApplicationMessage>> GetMessages(CancellationToken cancellationToken = default)
{
lock (_messages)
{
Expand All @@ -135,5 +146,26 @@ public async Task ClearMessages()
await _eventContainer.RetainedMessagesClearedEvent.InvokeAsync(EventArgs.Empty).ConfigureAwait(false);
}
}

public async Task LoadMessages(IEnumerable<SubscriptionRetainedMessagesResult> subscriptions, CancellationToken cancellationToken = default)
{
var allRetainedMessages = await GetMessages(cancellationToken);

for (var i = allRetainedMessages.Count - 1; i >= 0; i--)
{
var retainedMessage = allRetainedMessages[i];
foreach (var subscription in subscriptions)
{
if (MqttTopicFilterComparer.Compare(retainedMessage.Topic, subscription.Subscription.Topic) == MqttTopicFilterCompareResult.IsMatch)
{
subscription.RetainedMessages.Add(retainedMessage);

// Skip the following subscriptions, as each message may only be sent once.
// Following subscriptions do not have to be checked, as the message is sent anyway.
break;
}
}
}
}
}
}
3 changes: 2 additions & 1 deletion Source/MQTTnet/Server/Internal/MqttSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using MQTTnet.Internal;
using MQTTnet.Packets;
using MQTTnet.Protocol;
using MQTTnet.Server.Internal;

namespace MQTTnet.Server
{
Expand All @@ -36,7 +37,7 @@ public MqttSession(
IDictionary items,
MqttServerOptions serverOptions,
MqttServerEventContainer eventContainer,
MqttRetainedMessagesManager retainedMessagesManager,
IMqttRetainedMessagesManager retainedMessagesManager,
MqttClientSessionsManager clientSessionsManager)
{
Id = clientId ?? throw new ArgumentNullException(nameof(clientId));
Expand Down
Loading