diff --git a/MQTTnet.sln b/MQTTnet.sln index 34abf629f..31b2e1e03 100644 --- a/MQTTnet.sln +++ b/MQTTnet.sln @@ -23,6 +23,9 @@ EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.Samples", "Samples\MQTTnet.Samples.csproj", "{71CF35F5-3327-4A91-AAF4-5340F6701771}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.Tests", "Source\MQTTnet.Tests\MQTTnet.Tests.csproj", "{B270F32A-9F3E-42EE-A989-813E35E29ADB}" + ProjectSection(ProjectDependencies) = postProject + {576D87F2-C86B-4C29-BD10-428EBBB1E6FD} = {576D87F2-C86B-4C29-BD10-428EBBB1E6FD} + EndProjectSection EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.AspNetCore.Tests", "Source\MQTTnet.AspNetCore.Tests\MQTTnet.AspNetCore.Tests.csproj", "{A238BBBF-C75F-482D-9CC3-BB34ABA9B675}" EndProject @@ -32,6 +35,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.TestApp", "Source\M EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.AspTestApp", "Source\MQTTnet.AspTestApp\MQTTnet.AspTestApp.csproj", "{72867E4C-4E15-4E8E-8FAB-AE9253286BBC}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.Extensions.Metrics", "Source\MQTTnet.Extensions.Metrics\MQTTnet.Extensions.Metrics.csproj", "{576D87F2-C86B-4C29-BD10-428EBBB1E6FD}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -82,6 +87,10 @@ Global {72867E4C-4E15-4E8E-8FAB-AE9253286BBC}.Debug|Any CPU.Build.0 = Debug|Any CPU {72867E4C-4E15-4E8E-8FAB-AE9253286BBC}.Release|Any CPU.ActiveCfg = Release|Any CPU {72867E4C-4E15-4E8E-8FAB-AE9253286BBC}.Release|Any CPU.Build.0 = Release|Any CPU + {576D87F2-C86B-4C29-BD10-428EBBB1E6FD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {576D87F2-C86B-4C29-BD10-428EBBB1E6FD}.Debug|Any CPU.Build.0 = Debug|Any CPU + {576D87F2-C86B-4C29-BD10-428EBBB1E6FD}.Release|Any CPU.ActiveCfg = Release|Any CPU + {576D87F2-C86B-4C29-BD10-428EBBB1E6FD}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/Source/MQTTnet.Extensions.Metrics/MQTTnet.Extensions.Metrics.csproj b/Source/MQTTnet.Extensions.Metrics/MQTTnet.Extensions.Metrics.csproj new file mode 100644 index 000000000..6ca228ebe --- /dev/null +++ b/Source/MQTTnet.Extensions.Metrics/MQTTnet.Extensions.Metrics.csproj @@ -0,0 +1,17 @@ + + + + netstandard2.0;netstandard2.1;net6.0;net7.0 + $(TargetFrameworks);net452;net461;net48 + $(TargetFrameworks);uap10.0 + + + + + + + + + + + diff --git a/Source/MQTTnet.Extensions.Metrics/MqttMetricsExtensions.cs b/Source/MQTTnet.Extensions.Metrics/MqttMetricsExtensions.cs new file mode 100644 index 000000000..1065872c5 --- /dev/null +++ b/Source/MQTTnet.Extensions.Metrics/MqttMetricsExtensions.cs @@ -0,0 +1,68 @@ +using System; +using System.Diagnostics.Metrics; +using System.Threading.Tasks; +using MQTTnet.Server; +using MQTTnet.Diagnostics.Instrumentation; + +namespace MQTTnet.Extensions.Metrics +{ + public static class MqttMetricsExtensions + { + public static void AddMetrics(this IMqttServerMetricSource server, Meter meter) + { + if (meter == null) + { + throw new ArgumentNullException(nameof(meter)); + } + + meter.CreateObservableUpDownCounter("mqtt.sessions.count", server.GetActiveSessionCount); + meter.CreateObservableCounter("mqtt.clients.count", server.GetActiveClientCount); + + var publishedMessageCounter = meter.CreateCounter("mqtt.messages.published.count"); + server.InterceptingPublishAsync += (InterceptingPublishEventArgs arg) => + { + publishedMessageCounter.Add(1); + return Task.CompletedTask; + }; + + var deliveredMessageCounter = meter.CreateCounter( + name: "mqtt.messages.delivered.count", + unit: "total", + description: "Cumulative total number of MQTTnet messages delivered to subscribers"); + + var droppedQueueFullMessageCounter = meter.CreateCounter( + name: "mqtt.messages.dropped.queuefull.count", + unit: "total", + description: "Cumulative total number of MQTTnet messages dropped because the queue is full"); + + server.ApplicationMessageEnqueuedOrDroppedAsync += (ApplicationMessageEnqueuedEventArgs args) => + { + if (args.IsDropped) + { + droppedQueueFullMessageCounter.Add(1); + } + else + { + deliveredMessageCounter.Add(1); + } + return Task.CompletedTask; + }; + server.QueuedApplicationMessageOverwrittenAsync += (QueueMessageOverwrittenEventArgs args) => + { + droppedQueueFullMessageCounter.Add(1); + return Task.CompletedTask; + }; + + var successfulConnectionsCounter = meter.CreateCounter( + name: "mqtt.clients.connected.count", + unit: "total", + description: "Cumulative total number of connections successfully established with MQTTnet"); + server.ClientConnectedAsync += (ClientConnectedEventArgs arg) => + { + successfulConnectionsCounter.Add(1); + return Task.CompletedTask; + }; + + } + } +} diff --git a/Source/MQTTnet.Tests/Extensions/Metrics_Tests.cs b/Source/MQTTnet.Tests/Extensions/Metrics_Tests.cs new file mode 100644 index 000000000..30ee36c51 --- /dev/null +++ b/Source/MQTTnet.Tests/Extensions/Metrics_Tests.cs @@ -0,0 +1,169 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. +using System; +using System.Collections.Generic; +using System.Diagnostics.Metrics; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using MQTTnet.Diagnostics.Instrumentation; +using MQTTnet.Extensions.Metrics; +using MQTTnet.Formatter; +using MQTTnet.Packets; +using MQTTnet.Server; + +namespace MQTTnet.Tests.Extensions +{ + [TestClass] + public sealed class Metrics_Tests : BaseTestClass + { + [TestMethod] + public async Task All_Metrics() + { + int sessionCount = 0; + int clientCount = 0; + long connectedCount = 0; + long publishedCount = 0; + long deliveredCount = 0; + long droppedCount = 0; + + using (var testEnvironment = CreateTestEnvironment()) + using (Meter meter = new Meter("TEST")) + using (MeterListener meterListener = new MeterListener()) + { + meterListener.InstrumentPublished = (instrument, listener) => + { + switch (instrument.Name) + { + case "mqtt.sessions.count": + case "mqtt.clients.count": + case "mqtt.clients.connected.count": + case "mqtt.messages.published.count": + case "mqtt.messages.delivered.count": + case "mqtt.messages.dropped.queuefull.count": + listener.EnableMeasurementEvents(instrument); + break; + } + }; + + var metricsSource = new TestMetricsSource(); + MeasurementCallback intCallback = + (Instrument instrument, int measurement, ReadOnlySpan> tags, object state) => + { + switch (instrument.Name) + { + case "mqtt.sessions.count": + sessionCount = measurement; + break; + case "mqtt.clients.count": + clientCount = measurement; + break; + } + }; + meterListener.SetMeasurementEventCallback(intCallback); + MeasurementCallback longCallback = + (Instrument instrument, long measurement, ReadOnlySpan> tags, object state) => + { + switch (instrument.Name) + { + case "mqtt.clients.connected.count": + connectedCount += measurement; + break; + case "mqtt.messages.published.count": + publishedCount += measurement; + break; + case "mqtt.messages.delivered.count": + deliveredCount += measurement; + break; + case "mqtt.messages.dropped.queuefull.count": + droppedCount += measurement; + break; + } + }; + meterListener.SetMeasurementEventCallback(longCallback); + meterListener.Start(); + metricsSource.AddMetrics(meter); + meterListener.RecordObservableInstruments(); + Assert.AreEqual(1000, clientCount); + Assert.AreEqual(1001, sessionCount); + + await metricsSource.TriggerClientConnection(); + Assert.AreEqual(1, connectedCount); + + await metricsSource.TriggerPublish(); + Assert.AreEqual(1, publishedCount); + + await metricsSource.TriggerEnqueueOrDrop(dropQueueFull: false); + Assert.AreEqual(1, deliveredCount); + + await metricsSource.TriggerEnqueueOrDrop(dropQueueFull: true); + Assert.AreEqual(1, droppedCount); + + await metricsSource.TriggerOverwriteMessage(); + Assert.AreEqual(2, droppedCount); + } + } + + private class TestMetricsSource : IMqttServerMetricSource + { + public event Func ClientConnectedAsync; + public event Func InterceptingPublishAsync; + public event Func ApplicationMessageEnqueuedOrDroppedAsync; + public event Func QueuedApplicationMessageOverwrittenAsync; + + public int GetActiveClientCount() + { + return 1000; + } + + public int GetActiveSessionCount() + { + return 1001; + } + + public Task TriggerClientConnection() + { + var args = new ClientConnectedEventArgs( + new MqttConnectPacket(), + MqttProtocolVersion.V500, + null, + new Dictionary()); + return ClientConnectedAsync(args); + } + + public Task TriggerPublish() + { + var args = new InterceptingPublishEventArgs( + new MqttApplicationMessage(), + CancellationToken.None, + "client", + new Dictionary()); + return InterceptingPublishAsync(args); + } + + public Task TriggerEnqueueOrDrop(bool dropQueueFull) + { + var args = new ApplicationMessageEnqueuedEventArgs( + "sender", + "receiver", + new MqttApplicationMessage(), + dropQueueFull); + return ApplicationMessageEnqueuedOrDroppedAsync(args); + } + + public Task TriggerOverwriteMessage() + { + var args = new QueueMessageOverwrittenEventArgs( + "receiver", + new DummyPacket()); + return QueuedApplicationMessageOverwrittenAsync(args); + } + + private class DummyPacket : MqttPacket + { + public DummyPacket() {} + } + } + } +} \ No newline at end of file diff --git a/Source/MQTTnet.Tests/MQTTnet.Tests.csproj b/Source/MQTTnet.Tests/MQTTnet.Tests.csproj index c19b803fe..a08d7af52 100644 --- a/Source/MQTTnet.Tests/MQTTnet.Tests.csproj +++ b/Source/MQTTnet.Tests/MQTTnet.Tests.csproj @@ -15,12 +15,14 @@ + + diff --git a/Source/MQTTnet/Diagnostics/Instrumentation/IMqttServerMetricSource.cs b/Source/MQTTnet/Diagnostics/Instrumentation/IMqttServerMetricSource.cs new file mode 100644 index 000000000..52ff8a017 --- /dev/null +++ b/Source/MQTTnet/Diagnostics/Instrumentation/IMqttServerMetricSource.cs @@ -0,0 +1,16 @@ +using System; +using System.Threading.Tasks; +using MQTTnet.Server; + +namespace MQTTnet.Diagnostics.Instrumentation +{ + public interface IMqttServerMetricSource + { + int GetActiveSessionCount(); + int GetActiveClientCount(); + event Func ClientConnectedAsync; + event Func InterceptingPublishAsync; + event Func ApplicationMessageEnqueuedOrDroppedAsync; + event Func QueuedApplicationMessageOverwrittenAsync; + } +} \ No newline at end of file diff --git a/Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs index 4f6864b88..5740a9524 100644 --- a/Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs +++ b/Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs @@ -59,6 +59,10 @@ public MqttClientSessionsManager( _eventContainer = eventContainer ?? throw new ArgumentNullException(nameof(eventContainer)); } + public int GetActiveClientCount() => _clients.Values.Count(c => !c.IsTakenOver && c.IsRunning); + + public int GetActiveSessionCount() => _sessionsStorage.Count; + public async Task CloseAllConnections(MqttServerClientDisconnectOptions options) { if (options == null) @@ -300,6 +304,22 @@ public MqttClient GetClient(string id) } } + public MqttClientStatus GetClientStatus(string id) + { + lock (_clients) + { + if (!_clients.TryGetValue(id, out var client)) + { + throw new InvalidOperationException($"Client with ID '{id}' not found."); + } + var clientStatus = new MqttClientStatus(client) + { + Session = new MqttSessionStatus(client.Session) + }; + return clientStatus; + } + } + public List GetClients() { lock (_clients) diff --git a/Source/MQTTnet/Server/Internal/MqttSessionsStorage.cs b/Source/MQTTnet/Server/Internal/MqttSessionsStorage.cs index bbb10e430..e0cd78775 100644 --- a/Source/MQTTnet/Server/Internal/MqttSessionsStorage.cs +++ b/Source/MQTTnet/Server/Internal/MqttSessionsStorage.cs @@ -13,6 +13,8 @@ public sealed class MqttSessionsStorage { readonly Dictionary _sessions = new Dictionary(4096); + public int Count => _sessions.Count; + public void Clear() { // Make sure that the sessions are also getting disposed! diff --git a/Source/MQTTnet/Server/MqttServer.cs b/Source/MQTTnet/Server/MqttServer.cs index 19ba2244e..ca1830632 100644 --- a/Source/MQTTnet/Server/MqttServer.cs +++ b/Source/MQTTnet/Server/MqttServer.cs @@ -14,11 +14,12 @@ using MQTTnet.Internal; using MQTTnet.Packets; using MQTTnet.Protocol; +using MQTTnet.Diagnostics.Instrumentation; using MQTTnet.Server.Disconnecting; namespace MQTTnet.Server { - public class MqttServer : Disposable + public class MqttServer : Disposable, IMqttServerMetricSource { readonly ICollection _adapters; readonly MqttClientSessionsManager _clientSessionsManager; @@ -51,6 +52,10 @@ public MqttServer(MqttServerOptions options, IEnumerable ada _keepAliveMonitor = new MqttServerKeepAliveMonitor(options, _clientSessionsManager, _rootLogger); } + public int GetActiveClientCount() => _clientSessionsManager.GetActiveClientCount(); + + public int GetActiveSessionCount() => _clientSessionsManager.GetActiveSessionCount(); + public event Func ApplicationMessageEnqueuedOrDroppedAsync { add => _eventContainer.ApplicationMessageEnqueuedOrDroppedEvent.AddHandler(value);