Skip to content

Commit 09ab30f

Browse files
kallayjchkr1011
andauthored
Adds events for delivered and dropped messages. (#1866)
* Adds events for delivered and dropped messages. * Rename new events to match other namings * Update ReleaseNotes.md --------- Co-authored-by: Christian <[email protected]>
1 parent ae84aa4 commit 09ab30f

9 files changed

+104
-7
lines changed

.github/workflows/ReleaseNotes.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
* [Client] Added support for custom CA chain validation (#1851, thanks to @rido-min).
2-
* [Client] Added support for custom CA chain validation (#1851, thanks to @rido-min).
32
* [Client] Fixed handling of unobserved tasks exceptions (#1871).
43
* [Client] Fixed not specified ReasonCode when using _SendExtendedAuthenticationExchangeDataAsync_ (#1882, thanks to @rido-min).
54
* [Server] Fixed not working _UpdateRetainedMessageAsync_ public api (#1858, thanks to @kimdiego2098).
65
* [Server] Added support for custom DISCONNECT packets when stopping the server or disconnect a client (BREAKING CHANGE!, #1846).
76
* [Server] Added new property to stop the server from accepting new connections even if it is running (#1846).
7+
* [Server] Added new events for delivered and dropped messages (#1866, thanks to @kallayj).

Source/MQTTnet/Internal/MqttPacketBus.cs

+7-2
Original file line numberDiff line numberDiff line change
@@ -104,17 +104,22 @@ public void Dispose()
104104
_signal.Dispose();
105105
}
106106

107-
public void DropFirstItem(MqttPacketBusPartition partition)
107+
public MqttPacketBusItem DropFirstItem(MqttPacketBusPartition partition)
108108
{
109109
lock (_syncRoot)
110110
{
111111
var partitionInstance = _partitions[(int)partition];
112112

113-
if (partitionInstance.Any())
113+
if (partitionInstance.Count > 0)
114114
{
115+
var firstItem = partitionInstance.First.Value;
115116
partitionInstance.RemoveFirst();
117+
118+
return firstItem;
116119
}
117120
}
121+
122+
return null;
118123
}
119124

120125
public void EnqueueItem(MqttPacketBusItem item, MqttPacketBusPartition partition)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using System;
6+
7+
namespace MQTTnet.Server
8+
{
9+
public sealed class ApplicationMessageEnqueuedEventArgs : EventArgs
10+
{
11+
public ApplicationMessageEnqueuedEventArgs(string senderClientId, string receiverClientId, MqttApplicationMessage applicationMessage, bool isDropped)
12+
{
13+
SenderClientId = senderClientId ?? throw new ArgumentNullException( nameof(senderClientId));
14+
ReceiverClientId = receiverClientId ?? throw new ArgumentNullException(nameof(receiverClientId));
15+
ApplicationMessage = applicationMessage ?? throw new ArgumentNullException(nameof(applicationMessage));
16+
IsDropped = isDropped;
17+
}
18+
19+
public string SenderClientId { get; }
20+
21+
public string ReceiverClientId { get; }
22+
23+
public bool IsDropped { get; }
24+
25+
public MqttApplicationMessage ApplicationMessage { get; }
26+
}
27+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using System;
6+
using MQTTnet.Packets;
7+
8+
namespace MQTTnet.Server
9+
{
10+
public sealed class QueueMessageOverwrittenEventArgs : EventArgs
11+
{
12+
public QueueMessageOverwrittenEventArgs(string receiverClientId, MqttPacket packet)
13+
{
14+
ReceiverClientId = receiverClientId ?? throw new ArgumentNullException(nameof(receiverClientId));
15+
Packet = packet ?? throw new ArgumentNullException(nameof(packet));
16+
}
17+
18+
public MqttPacket Packet { get; }
19+
20+
public string ReceiverClientId { get; }
21+
}
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
namespace MQTTnet.Server
6+
{
7+
public enum EnqueueDataPacketResult
8+
{
9+
Enqueued,
10+
Dropped
11+
}
12+
}

Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs

+8-1
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,14 @@ public async Task<DispatchApplicationMessageResult> DispatchApplicationMessage(
244244

245245
matchingSubscribersCount++;
246246

247-
session.EnqueueDataPacket(new MqttPacketBusItem(publishPacketCopy));
247+
var result = session.EnqueueDataPacket(new MqttPacketBusItem(publishPacketCopy));
248+
249+
if (_eventContainer.ApplicationMessageEnqueuedOrDroppedEvent.HasHandlers)
250+
{
251+
var eventArgs = new ApplicationMessageEnqueuedEventArgs(senderId, session.Id, applicationMessage, result == EnqueueDataPacketResult.Dropped);
252+
await _eventContainer.ApplicationMessageEnqueuedOrDroppedEvent.InvokeAsync(eventArgs).ConfigureAwait(false);
253+
}
254+
248255
_logger.Verbose("Client '{0}': Queued PUBLISH packet with topic '{1}'", session.Id, applicationMessage.Topic);
249256
}
250257

Source/MQTTnet/Server/Internal/MqttServerEventContainer.cs

+4
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ public sealed class MqttServerEventContainer
2222
public AsyncEvent<ClientUnsubscribedTopicEventArgs> ClientUnsubscribedTopicEvent { get; } = new AsyncEvent<ClientUnsubscribedTopicEventArgs>();
2323

2424
public AsyncEvent<InterceptingClientApplicationMessageEnqueueEventArgs> InterceptingClientEnqueueEvent { get; } = new AsyncEvent<InterceptingClientApplicationMessageEnqueueEventArgs>();
25+
26+
public AsyncEvent<ApplicationMessageEnqueuedEventArgs> ApplicationMessageEnqueuedOrDroppedEvent { get; } = new AsyncEvent<ApplicationMessageEnqueuedEventArgs>();
27+
28+
public AsyncEvent<QueueMessageOverwrittenEventArgs> QueuedApplicationMessageOverwrittenEvent { get; } = new AsyncEvent<QueueMessageOverwrittenEventArgs>();
2529

2630
public AsyncEvent<InterceptingPacketEventArgs> InterceptingInboundPacketEvent { get; } = new AsyncEvent<InterceptingPacketEventArgs>();
2731

Source/MQTTnet/Server/Internal/MqttSession.cs

+11-3
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ namespace MQTTnet.Server
1818
public sealed class MqttSession : IDisposable
1919
{
2020
readonly MqttClientSessionsManager _clientSessionsManager;
21+
readonly MqttServerEventContainer _eventContainer;
2122
readonly MqttPacketBus _packetBus = new MqttPacketBus();
2223
readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider();
2324

@@ -44,6 +45,7 @@ public MqttSession(
4445
_connectPacket = connectPacket ?? throw new ArgumentNullException(nameof(connectPacket));
4546
_serverOptions = serverOptions ?? throw new ArgumentNullException(nameof(serverOptions));
4647
_clientSessionsManager = clientSessionsManager ?? throw new ArgumentNullException(nameof(clientSessionsManager));
48+
_eventContainer = eventContainer ?? throw new ArgumentNullException(nameof(eventContainer));
4749

4850
_subscriptionsManager = new MqttClientSubscriptionsManager(this, eventContainer, retainedMessagesManager, clientSessionsManager);
4951
}
@@ -117,20 +119,25 @@ public void EnqueueControlPacket(MqttPacketBusItem packetBusItem)
117119
_packetBus.EnqueueItem(packetBusItem, MqttPacketBusPartition.Control);
118120
}
119121

120-
public void EnqueueDataPacket(MqttPacketBusItem packetBusItem)
122+
public EnqueueDataPacketResult EnqueueDataPacket(MqttPacketBusItem packetBusItem)
121123
{
122124
if (_packetBus.ItemsCount(MqttPacketBusPartition.Data) >= _serverOptions.MaxPendingMessagesPerClient)
123125
{
124126
if (_serverOptions.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropNewMessage)
125127
{
126-
return;
128+
return EnqueueDataPacketResult.Dropped;
127129
}
128130

129131
if (_serverOptions.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage)
130132
{
131133
// Only drop from the data partition. Dropping from control partition might break the connection
132134
// because the client does not receive PINGREQ packets etc. any longer.
133-
_packetBus.DropFirstItem(MqttPacketBusPartition.Data);
135+
var firstItem = _packetBus.DropFirstItem(MqttPacketBusPartition.Data);
136+
if (firstItem != null && _eventContainer.QueuedApplicationMessageOverwrittenEvent.HasHandlers)
137+
{
138+
var eventArgs = new QueueMessageOverwrittenEventArgs(Id, firstItem.Packet);
139+
_eventContainer.QueuedApplicationMessageOverwrittenEvent.InvokeAsync(eventArgs).ConfigureAwait(false);
140+
}
134141
}
135142
}
136143

@@ -147,6 +154,7 @@ public void EnqueueDataPacket(MqttPacketBusItem packetBusItem)
147154
}
148155

149156
_packetBus.EnqueueItem(packetBusItem, MqttPacketBusPartition.Data);
157+
return EnqueueDataPacketResult.Enqueued;
150158
}
151159

152160
public void EnqueueHealthPacket(MqttPacketBusItem packetBusItem)

Source/MQTTnet/Server/MqttServer.cs

+12
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@ public MqttServer(MqttServerOptions options, IEnumerable<IMqttServerAdapter> ada
5151
_keepAliveMonitor = new MqttServerKeepAliveMonitor(options, _clientSessionsManager, _rootLogger);
5252
}
5353

54+
public event Func<ApplicationMessageEnqueuedEventArgs, Task> ApplicationMessageEnqueuedOrDroppedAsync
55+
{
56+
add => _eventContainer.ApplicationMessageEnqueuedOrDroppedEvent.AddHandler(value);
57+
remove => _eventContainer.ApplicationMessageEnqueuedOrDroppedEvent.RemoveHandler(value);
58+
}
59+
5460
public event Func<ApplicationMessageNotConsumedEventArgs, Task> ApplicationMessageNotConsumedAsync
5561
{
5662
add => _eventContainer.ApplicationMessageNotConsumedEvent.AddHandler(value);
@@ -135,6 +141,12 @@ public event Func<EventArgs, Task> PreparingSessionAsync
135141
remove => _eventContainer.PreparingSessionEvent.RemoveHandler(value);
136142
}
137143

144+
public event Func<QueueMessageOverwrittenEventArgs, Task> QueuedApplicationMessageOverwrittenAsync
145+
{
146+
add => _eventContainer.QueuedApplicationMessageOverwrittenEvent.AddHandler(value);
147+
remove => _eventContainer.QueuedApplicationMessageOverwrittenEvent.RemoveHandler(value);
148+
}
149+
138150
public event Func<RetainedMessageChangedEventArgs, Task> RetainedMessageChangedAsync
139151
{
140152
add => _eventContainer.RetainedMessageChangedEvent.AddHandler(value);

0 commit comments

Comments
 (0)