Skip to content

Commit 16a7c91

Browse files
Anton SmolkovAntonSmolkov
Anton Smolkov
authored andcommitted
Improve session message injection methods
1 parent 8089c6b commit 16a7c91

File tree

5 files changed

+73
-20
lines changed

5 files changed

+73
-20
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
namespace MQTTnet.Server.Exceptions
6+
{
7+
public class MqttPendingMessagesOverflowException : Exception
8+
{
9+
public string SessionId { get; }
10+
public MqttPendingMessagesOverflowStrategy OverflowStrategy { get; }
11+
12+
public MqttPendingMessagesOverflowException(string sessionId, MqttPendingMessagesOverflowStrategy overflowStrategy)
13+
: base($"Send buffer max pending messages overflow occurred for session '{sessionId}'. Strategy: {overflowStrategy}.")
14+
{
15+
SessionId = sessionId;
16+
OverflowStrategy = overflowStrategy;
17+
}
18+
}
19+
}

Source/MQTTnet.Server/Internal/MqttSession.cs

+10-4
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using MQTTnet.Internal;
77
using MQTTnet.Packets;
88
using MQTTnet.Protocol;
9+
using MQTTnet.Server.Exceptions;
910

1011
namespace MQTTnet.Server.Internal;
1112

@@ -109,10 +110,11 @@ public void EnqueueControlPacket(MqttPacketBusItem packetBusItem)
109110

110111
public EnqueueDataPacketResult EnqueueDataPacket(MqttPacketBusItem packetBusItem)
111112
{
112-
if (_packetBus.ItemsCount(MqttPacketBusPartition.Data) >= _serverOptions.MaxPendingMessagesPerClient)
113+
if (PendingDataPacketsCount >= _serverOptions.MaxPendingMessagesPerClient)
113114
{
114115
if (_serverOptions.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropNewMessage)
115116
{
117+
packetBusItem.Fail(new MqttPendingMessagesOverflowException(Id, _serverOptions.PendingMessagesOverflowStrategy));
116118
return EnqueueDataPacketResult.Dropped;
117119
}
118120

@@ -121,10 +123,14 @@ public EnqueueDataPacketResult EnqueueDataPacket(MqttPacketBusItem packetBusItem
121123
// Only drop from the data partition. Dropping from control partition might break the connection
122124
// because the client does not receive PINGREQ packets etc. any longer.
123125
var firstItem = _packetBus.DropFirstItem(MqttPacketBusPartition.Data);
124-
if (firstItem != null && _eventContainer.QueuedApplicationMessageOverwrittenEvent.HasHandlers)
126+
if (firstItem != null)
125127
{
126-
var eventArgs = new QueueMessageOverwrittenEventArgs(Id, firstItem.Packet);
127-
_eventContainer.QueuedApplicationMessageOverwrittenEvent.InvokeAsync(eventArgs).ConfigureAwait(false);
128+
firstItem.Fail(new MqttPendingMessagesOverflowException(Id, _serverOptions.PendingMessagesOverflowStrategy));
129+
if (_eventContainer.QueuedApplicationMessageOverwrittenEvent.HasHandlers)
130+
{
131+
var eventArgs = new QueueMessageOverwrittenEventArgs(Id, firstItem.Packet);
132+
_eventContainer.QueuedApplicationMessageOverwrittenEvent.InvokeAsync(eventArgs).ConfigureAwait(false);
133+
}
128134
}
129135
}
130136
}

Source/MQTTnet.Server/Status/MqttSessionStatus.cs

+40-4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
using System.Collections;
66
using MQTTnet.Internal;
7+
using MQTTnet.Packets;
78
using MQTTnet.Server.Internal;
89
using MQTTnet.Server.Internal.Formatter;
910

@@ -40,22 +41,57 @@ public Task DeleteAsync()
4041
return _session.DeleteAsync();
4142
}
4243

43-
public Task DeliverApplicationMessageAsync(MqttApplicationMessage applicationMessage)
44+
/// <summary>
45+
/// Delivers an application message immediately to the session.
46+
/// </summary>
47+
/// <param name="applicationMessage">The application message to deliver.</param>
48+
/// <returns>
49+
/// A task that represents the asynchronous operation. The result contains the delivered MQTT publish packet.
50+
/// </returns>
51+
public async Task<MqttPublishPacket> DeliverApplicationMessageAsync(MqttApplicationMessage applicationMessage)
4452
{
4553
ArgumentNullException.ThrowIfNull(applicationMessage);
4654

4755
var packetBusItem = new MqttPacketBusItem(MqttPublishPacketFactory.Create(applicationMessage));
4856
_session.EnqueueDataPacket(packetBusItem);
4957

50-
return packetBusItem.WaitAsync();
58+
var mqttPacket = await packetBusItem.WaitAsync();
59+
60+
return (MqttPublishPacket)mqttPacket;
5161
}
5262

53-
public Task EnqueueApplicationMessageAsync(MqttApplicationMessage applicationMessage)
63+
/// <summary>
64+
/// Attempts to enqueue an application message to the session's send buffer.
65+
/// </summary>
66+
/// <param name="applicationMessage">The application message to enqueue.</param>
67+
/// <param name="publishPacket">The corresponding MQTT publish packed, if the operation was successful.</param>
68+
/// <returns><c>true</c> if the message was successfully enqueued; otherwise, <c>false</c>.</returns>
69+
/// <remarks>
70+
/// When <see cref="MqttServerOptions.PendingMessagesOverflowStrategy"/> is set to <see cref="MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage"/>,
71+
/// this method always returns <c>true</c>.
72+
/// However, an existing message in the queue may be <b>dropped later</b> to make room for the newly enqueued message.
73+
/// Such dropped messages can be tracked by subscribing to <see cref="MqttServer.QueuedApplicationMessageOverwrittenAsync"/> event.
74+
/// </remarks>
75+
public bool TryEnqueueApplicationMessage(MqttApplicationMessage applicationMessage, out MqttPublishPacket publishPacket)
5476
{
5577
ArgumentNullException.ThrowIfNull(applicationMessage);
5678

57-
_session.EnqueueDataPacket(new MqttPacketBusItem(MqttPublishPacketFactory.Create(applicationMessage)));
79+
publishPacket = MqttPublishPacketFactory.Create(applicationMessage);
80+
var enqueueDataPacketResult = _session.EnqueueDataPacket(new MqttPacketBusItem(publishPacket));
5881

82+
if (enqueueDataPacketResult == EnqueueDataPacketResult.Enqueued)
83+
{
84+
return true;
85+
}
86+
87+
publishPacket = null;
88+
return false;
89+
}
90+
91+
[Obsolete("This method is obsolete. Use TryEnqueueApplicationMessage instead.")]
92+
public Task EnqueueApplicationMessageAsync(MqttApplicationMessage applicationMessage)
93+
{
94+
TryEnqueueApplicationMessage(applicationMessage, out _);
5995
return CompletedTask.Instance;
6096
}
6197
}

Source/MQTTnet/Internal/MqttPacketBus.cs

-8
Original file line numberDiff line numberDiff line change
@@ -141,14 +141,6 @@ public List<MqttPacket> ExportPackets(MqttPacketBusPartition partition)
141141
}
142142
}
143143

144-
public int ItemsCount(MqttPacketBusPartition partition)
145-
{
146-
lock (_syncRoot)
147-
{
148-
return _partitions[(int)partition].Count;
149-
}
150-
}
151-
152144
public int PartitionItemsCount(MqttPacketBusPartition partition)
153145
{
154146
lock (_syncRoot)

Source/MQTTnet/Internal/MqttPacketBusItem.cs

+4-4
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ namespace MQTTnet.Internal
1010
{
1111
public sealed class MqttPacketBusItem
1212
{
13-
readonly AsyncTaskCompletionSource<bool> _promise = new AsyncTaskCompletionSource<bool>();
14-
13+
readonly AsyncTaskCompletionSource<MqttPacket> _promise = new AsyncTaskCompletionSource<MqttPacket>();
14+
1515
public MqttPacketBusItem(MqttPacket packet)
1616
{
1717
Packet = packet ?? throw new ArgumentNullException(nameof(packet));
@@ -28,7 +28,7 @@ public void Cancel()
2828

2929
public void Complete()
3030
{
31-
_promise.TrySetResult(true);
31+
_promise.TrySetResult(Packet);
3232
Completed?.Invoke(this, EventArgs.Empty);
3333
}
3434

@@ -37,7 +37,7 @@ public void Fail(Exception exception)
3737
_promise.TrySetException(exception);
3838
}
3939

40-
public Task WaitAsync()
40+
public Task<MqttPacket> WaitAsync()
4141
{
4242
return _promise.Task;
4343
}

0 commit comments

Comments
 (0)