Skip to content

Commit 27771f0

Browse files
AntonSmolkovAnton Smolkovchkr1011
authored
Improve session message injection (#2117)
* Improve session message injection methods * Add more tests for session injection * Change return type from MqttPublishPacket to MqttPacketWithIdentifier * Use dedicated type for MattApplicationMessage injection result * Code review minor fixes * Apply project code style * Update release notes --------- Co-authored-by: Anton Smolkov <[email protected]> Co-authored-by: christian <[email protected]>
1 parent e7de2b9 commit 27771f0

11 files changed

+521
-69
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
namespace MQTTnet.Server.Exceptions;
6+
7+
public class MqttPendingMessagesOverflowException : Exception
8+
{
9+
public MqttPendingMessagesOverflowException(string sessionId, MqttPendingMessagesOverflowStrategy overflowStrategy) : base(
10+
$"Send buffer max pending messages overflow occurred for session '{sessionId}'. Strategy: {overflowStrategy}.")
11+
{
12+
SessionId = sessionId;
13+
OverflowStrategy = overflowStrategy;
14+
}
15+
16+
public MqttPendingMessagesOverflowStrategy OverflowStrategy { get; }
17+
18+
public string SessionId { get; }
19+
}

Source/MQTTnet.Server/Internal/MqttSession.cs

+11-4
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using MQTTnet.Internal;
77
using MQTTnet.Packets;
88
using MQTTnet.Protocol;
9+
using MQTTnet.Server.Exceptions;
910

1011
namespace MQTTnet.Server.Internal;
1112

@@ -111,10 +112,11 @@ public void EnqueueControlPacket(MqttPacketBusItem packetBusItem)
111112

112113
public EnqueueDataPacketResult EnqueueDataPacket(MqttPacketBusItem packetBusItem)
113114
{
114-
if (_packetBus.ItemsCount(MqttPacketBusPartition.Data) >= _serverOptions.MaxPendingMessagesPerClient)
115+
if (PendingDataPacketsCount >= _serverOptions.MaxPendingMessagesPerClient)
115116
{
116117
if (_serverOptions.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropNewMessage)
117118
{
119+
packetBusItem.Fail(new MqttPendingMessagesOverflowException(Id, _serverOptions.PendingMessagesOverflowStrategy));
118120
return EnqueueDataPacketResult.Dropped;
119121
}
120122

@@ -123,10 +125,15 @@ public EnqueueDataPacketResult EnqueueDataPacket(MqttPacketBusItem packetBusItem
123125
// Only drop from the data partition. Dropping from control partition might break the connection
124126
// because the client does not receive PINGREQ packets etc. any longer.
125127
var firstItem = _packetBus.DropFirstItem(MqttPacketBusPartition.Data);
126-
if (firstItem != null && _eventContainer.QueuedApplicationMessageOverwrittenEvent.HasHandlers)
128+
if (firstItem != null)
127129
{
128-
var eventArgs = new QueueMessageOverwrittenEventArgs(Id, firstItem.Packet);
129-
_eventContainer.QueuedApplicationMessageOverwrittenEvent.InvokeAsync(eventArgs).ConfigureAwait(false);
130+
firstItem.Fail(new MqttPendingMessagesOverflowException(Id, _serverOptions.PendingMessagesOverflowStrategy));
131+
132+
if (_eventContainer.QueuedApplicationMessageOverwrittenEvent.HasHandlers)
133+
{
134+
var eventArgs = new QueueMessageOverwrittenEventArgs(Id, firstItem.Packet);
135+
_eventContainer.QueuedApplicationMessageOverwrittenEvent.InvokeAsync(eventArgs).ConfigureAwait(false);
136+
}
130137
}
131138
}
132139
}

Source/MQTTnet.Server/Options/MqttServerOptionsBuilder.cs

+6
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,12 @@ public MqttServerOptionsBuilder WithMaxPendingMessagesPerClient(int value)
115115
return this;
116116
}
117117

118+
public MqttServerOptionsBuilder WithPendingMessagesOverflowStrategy(MqttPendingMessagesOverflowStrategy value)
119+
{
120+
_options.PendingMessagesOverflowStrategy = value;
121+
return this;
122+
}
123+
118124
public MqttServerOptionsBuilder WithoutDefaultEndpoint()
119125
{
120126
_options.DefaultEndpointOptions.IsEnabled = false;

Source/MQTTnet.Server/Status/MqttSessionStatus.cs

+48-5
Original file line numberDiff line numberDiff line change
@@ -40,22 +40,65 @@ public Task DeleteAsync()
4040
return _session.DeleteAsync();
4141
}
4242

43-
public Task DeliverApplicationMessageAsync(MqttApplicationMessage applicationMessage)
43+
/// <summary>
44+
/// Delivers an application message immediately to the session.
45+
/// </summary>
46+
/// <param name="applicationMessage">The application message to deliver.</param>
47+
/// <returns>
48+
/// A task that represents the asynchronous operation.
49+
/// The result contains the <see cref="InjectMqttApplicationMessageResult"/> that includes the packet identifier of the enqueued message.
50+
/// </returns>
51+
public async Task<InjectMqttApplicationMessageResult> DeliverApplicationMessageAsync(MqttApplicationMessage applicationMessage)
4452
{
4553
ArgumentNullException.ThrowIfNull(applicationMessage);
4654

47-
var packetBusItem = new MqttPacketBusItem(MqttPublishPacketFactory.Create(applicationMessage));
55+
var publishPacket = MqttPublishPacketFactory.Create(applicationMessage);
56+
var packetBusItem = new MqttPacketBusItem(publishPacket);
4857
_session.EnqueueDataPacket(packetBusItem);
4958

50-
return packetBusItem.WaitAsync();
59+
await packetBusItem.WaitAsync().ConfigureAwait(false);
60+
61+
var injectResult = new InjectMqttApplicationMessageResult()
62+
{
63+
PacketIdentifier = publishPacket.PacketIdentifier
64+
};
65+
66+
return injectResult;
5167
}
5268

53-
public Task EnqueueApplicationMessageAsync(MqttApplicationMessage applicationMessage)
69+
/// <summary>
70+
/// Attempts to enqueue an application message to the session's send buffer.
71+
/// </summary>
72+
/// <param name="applicationMessage">The application message to enqueue.</param>
73+
/// <param name="injectResult"><see cref="InjectMqttApplicationMessageResult"/> that includes the packet identifier of the enqueued message.</param>
74+
/// <returns><c>true</c> if the message was successfully enqueued; otherwise, <c>false</c>.</returns>
75+
/// <remarks>
76+
/// When <see cref="MqttServerOptions.PendingMessagesOverflowStrategy"/> is set to <see cref="MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage"/>,
77+
/// this method always returns <c>true</c>.
78+
/// However, an existing message in the queue may be <b>dropped later</b> to make room for the newly enqueued message.
79+
/// Such dropped messages can be tracked by subscribing to <see cref="MqttServer.QueuedApplicationMessageOverwrittenAsync"/> event.
80+
/// </remarks>
81+
public bool TryEnqueueApplicationMessage(MqttApplicationMessage applicationMessage, out InjectMqttApplicationMessageResult injectResult)
5482
{
5583
ArgumentNullException.ThrowIfNull(applicationMessage);
5684

57-
_session.EnqueueDataPacket(new MqttPacketBusItem(MqttPublishPacketFactory.Create(applicationMessage)));
85+
var publishPacket = MqttPublishPacketFactory.Create(applicationMessage);
86+
var enqueueDataPacketResult = _session.EnqueueDataPacket(new MqttPacketBusItem(publishPacket));
87+
88+
if (enqueueDataPacketResult != EnqueueDataPacketResult.Enqueued)
89+
{
90+
injectResult = null;
91+
return false;
92+
}
5893

94+
injectResult = new InjectMqttApplicationMessageResult() { PacketIdentifier = publishPacket.PacketIdentifier };
95+
return true;
96+
}
97+
98+
[Obsolete("This method is obsolete. Use TryEnqueueApplicationMessage instead.")]
99+
public Task EnqueueApplicationMessageAsync(MqttApplicationMessage applicationMessage)
100+
{
101+
TryEnqueueApplicationMessage(applicationMessage, out _);
59102
return CompletedTask.Instance;
60103
}
61104
}

Source/MQTTnet.Tests/BaseTestClass.cs

+4-3
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,11 @@ namespace MQTTnet.Tests
1313
public abstract class BaseTestClass
1414
{
1515
public TestContext TestContext { get; set; }
16-
17-
protected TestEnvironment CreateTestEnvironment(MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311)
16+
17+
protected TestEnvironment CreateTestEnvironment(
18+
MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311, bool trackUnobservedTaskException = true)
1819
{
19-
return new TestEnvironment(TestContext, protocolVersion);
20+
return new TestEnvironment(TestContext, protocolVersion, trackUnobservedTaskException);
2021
}
2122

2223
protected Task LongTestDelay()

Source/MQTTnet.Tests/Mockups/TestEnvironment.cs

+6-2
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,16 @@ public TestEnvironment() : this(null)
3232
{
3333
}
3434

35-
public TestEnvironment(TestContext testContext, MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311)
35+
public TestEnvironment(
36+
TestContext testContext, MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311, bool trackUnobservedTaskException = true)
3637
{
3738
_protocolVersion = protocolVersion;
3839
TestContext = testContext;
3940

40-
TaskScheduler.UnobservedTaskException += TrackUnobservedTaskException;
41+
if (trackUnobservedTaskException)
42+
{
43+
TaskScheduler.UnobservedTaskException += TrackUnobservedTaskException;
44+
}
4145

4246
ServerLogger.LogMessagePublished += (s, e) =>
4347
{

0 commit comments

Comments
 (0)