diff --git a/Source/MQTTnet.Server/Exceptions/MqttPendingMessagesOverflowException.cs b/Source/MQTTnet.Server/Exceptions/MqttPendingMessagesOverflowException.cs
new file mode 100644
index 000000000..0c045bc8c
--- /dev/null
+++ b/Source/MQTTnet.Server/Exceptions/MqttPendingMessagesOverflowException.cs
@@ -0,0 +1,19 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+namespace MQTTnet.Server.Exceptions;
+
+public class MqttPendingMessagesOverflowException : Exception
+{
+    public MqttPendingMessagesOverflowException(string sessionId, MqttPendingMessagesOverflowStrategy overflowStrategy) : base(
+        $"Send buffer max pending messages overflow occurred for session '{sessionId}'. Strategy: {overflowStrategy}.")
+    {
+        SessionId = sessionId;
+        OverflowStrategy = overflowStrategy;
+    }
+
+    public MqttPendingMessagesOverflowStrategy OverflowStrategy { get; }
+
+    public string SessionId { get; }
+}
\ No newline at end of file
diff --git a/Source/MQTTnet.Server/Internal/MqttSession.cs b/Source/MQTTnet.Server/Internal/MqttSession.cs
index 55814b849..7e3ab1279 100644
--- a/Source/MQTTnet.Server/Internal/MqttSession.cs
+++ b/Source/MQTTnet.Server/Internal/MqttSession.cs
@@ -6,6 +6,7 @@
 using MQTTnet.Internal;
 using MQTTnet.Packets;
 using MQTTnet.Protocol;
+using MQTTnet.Server.Exceptions;
 
 namespace MQTTnet.Server.Internal;
 
@@ -111,10 +112,11 @@ public void EnqueueControlPacket(MqttPacketBusItem packetBusItem)
 
     public EnqueueDataPacketResult EnqueueDataPacket(MqttPacketBusItem packetBusItem)
     {
-        if (_packetBus.ItemsCount(MqttPacketBusPartition.Data) >= _serverOptions.MaxPendingMessagesPerClient)
+        if (PendingDataPacketsCount >= _serverOptions.MaxPendingMessagesPerClient)
         {
             if (_serverOptions.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropNewMessage)
             {
+                packetBusItem.Fail(new MqttPendingMessagesOverflowException(Id, _serverOptions.PendingMessagesOverflowStrategy));
                 return EnqueueDataPacketResult.Dropped;
             }
 
@@ -123,10 +125,15 @@ public EnqueueDataPacketResult EnqueueDataPacket(MqttPacketBusItem packetBusItem
                 // Only drop from the data partition. Dropping from control partition might break the connection
                 // because the client does not receive PINGREQ packets etc. any longer.
                 var firstItem = _packetBus.DropFirstItem(MqttPacketBusPartition.Data);
-                if (firstItem != null && _eventContainer.QueuedApplicationMessageOverwrittenEvent.HasHandlers)
+                if (firstItem != null)
                 {
-                    var eventArgs = new QueueMessageOverwrittenEventArgs(Id, firstItem.Packet);
-                    _eventContainer.QueuedApplicationMessageOverwrittenEvent.InvokeAsync(eventArgs).ConfigureAwait(false);
+                    firstItem.Fail(new MqttPendingMessagesOverflowException(Id, _serverOptions.PendingMessagesOverflowStrategy));
+
+                    if (_eventContainer.QueuedApplicationMessageOverwrittenEvent.HasHandlers)
+                    {
+                        var eventArgs = new QueueMessageOverwrittenEventArgs(Id, firstItem.Packet);
+                        _eventContainer.QueuedApplicationMessageOverwrittenEvent.InvokeAsync(eventArgs).ConfigureAwait(false);
+                    }
                 }
             }
         }
diff --git a/Source/MQTTnet.Server/Options/MqttServerOptionsBuilder.cs b/Source/MQTTnet.Server/Options/MqttServerOptionsBuilder.cs
index 2e86e21eb..c238ea016 100644
--- a/Source/MQTTnet.Server/Options/MqttServerOptionsBuilder.cs
+++ b/Source/MQTTnet.Server/Options/MqttServerOptionsBuilder.cs
@@ -115,6 +115,12 @@ public MqttServerOptionsBuilder WithMaxPendingMessagesPerClient(int value)
             return this;
         }
 
+        public MqttServerOptionsBuilder WithPendingMessagesOverflowStrategy(MqttPendingMessagesOverflowStrategy value)
+        {
+            _options.PendingMessagesOverflowStrategy = value;
+            return this;
+        }
+
         public MqttServerOptionsBuilder WithoutDefaultEndpoint()
         {
             _options.DefaultEndpointOptions.IsEnabled = false;
diff --git a/Source/MQTTnet.Server/Status/MqttSessionStatus.cs b/Source/MQTTnet.Server/Status/MqttSessionStatus.cs
index ecb8460fb..780c08130 100644
--- a/Source/MQTTnet.Server/Status/MqttSessionStatus.cs
+++ b/Source/MQTTnet.Server/Status/MqttSessionStatus.cs
@@ -40,22 +40,65 @@ public Task DeleteAsync()
         return _session.DeleteAsync();
     }
 
-    public Task DeliverApplicationMessageAsync(MqttApplicationMessage applicationMessage)
+    /// <summary>
+    /// Delivers an application message immediately to the session.
+    /// </summary>
+    /// <param name="applicationMessage">The application message to deliver.</param>
+    /// <returns>
+    /// A task that represents the asynchronous operation.
+    /// The result contains the <see cref="InjectMqttApplicationMessageResult"/> that includes the packet identifier of the enqueued message.
+    /// </returns>
+    public async Task<InjectMqttApplicationMessageResult> DeliverApplicationMessageAsync(MqttApplicationMessage applicationMessage)
     {
         ArgumentNullException.ThrowIfNull(applicationMessage);
 
-        var packetBusItem = new MqttPacketBusItem(MqttPublishPacketFactory.Create(applicationMessage));
+        var publishPacket = MqttPublishPacketFactory.Create(applicationMessage);
+        var packetBusItem = new MqttPacketBusItem(publishPacket);
         _session.EnqueueDataPacket(packetBusItem);
 
-        return packetBusItem.WaitAsync();
+        await packetBusItem.WaitAsync().ConfigureAwait(false);
+
+        var injectResult = new InjectMqttApplicationMessageResult()
+        {
+            PacketIdentifier = publishPacket.PacketIdentifier
+        };
+
+        return injectResult;
     }
 
-    public Task EnqueueApplicationMessageAsync(MqttApplicationMessage applicationMessage)
+    /// <summary>
+    /// Attempts to enqueue an application message to the session's send buffer.
+    /// </summary>
+    /// <param name="applicationMessage">The application message to enqueue.</param>
+    /// <param name="injectResult"><see cref="InjectMqttApplicationMessageResult"/> that includes the packet identifier of the enqueued message.</param>
+    /// <returns><c>true</c> if the message was successfully enqueued; otherwise, <c>false</c>.</returns>
+    /// <remarks>
+    /// When <see cref="MqttServerOptions.PendingMessagesOverflowStrategy"/> is set to <see cref="MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage"/>,
+    /// this method always returns <c>true</c>.
+    /// However, an existing message in the queue may be <b>dropped later</b> to make room for the newly enqueued message.
+    /// Such dropped messages can be tracked by subscribing to <see cref="MqttServer.QueuedApplicationMessageOverwrittenAsync"/> event.
+    /// </remarks>
+    public bool TryEnqueueApplicationMessage(MqttApplicationMessage applicationMessage, out InjectMqttApplicationMessageResult injectResult)
     {
         ArgumentNullException.ThrowIfNull(applicationMessage);
 
-        _session.EnqueueDataPacket(new MqttPacketBusItem(MqttPublishPacketFactory.Create(applicationMessage)));
+        var publishPacket = MqttPublishPacketFactory.Create(applicationMessage);
+        var enqueueDataPacketResult = _session.EnqueueDataPacket(new MqttPacketBusItem(publishPacket));
+
+        if (enqueueDataPacketResult != EnqueueDataPacketResult.Enqueued)
+        {
+            injectResult = null;
+            return false;
+        }
 
+        injectResult = new InjectMqttApplicationMessageResult() { PacketIdentifier = publishPacket.PacketIdentifier };
+        return true;
+    }
+
+    [Obsolete("This method is obsolete. Use TryEnqueueApplicationMessage instead.")]
+    public Task EnqueueApplicationMessageAsync(MqttApplicationMessage applicationMessage)
+    {
+        TryEnqueueApplicationMessage(applicationMessage, out _);
         return CompletedTask.Instance;
     }
 }
\ No newline at end of file
diff --git a/Source/MQTTnet.Tests/BaseTestClass.cs b/Source/MQTTnet.Tests/BaseTestClass.cs
index 8e5248e7f..00291b024 100644
--- a/Source/MQTTnet.Tests/BaseTestClass.cs
+++ b/Source/MQTTnet.Tests/BaseTestClass.cs
@@ -13,10 +13,11 @@ namespace MQTTnet.Tests
     public abstract class BaseTestClass
     {
         public TestContext TestContext { get; set; }
-        
-        protected TestEnvironment CreateTestEnvironment(MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311)
+
+        protected TestEnvironment CreateTestEnvironment(
+            MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311, bool trackUnobservedTaskException = true)
         {
-            return new TestEnvironment(TestContext, protocolVersion);
+            return new TestEnvironment(TestContext, protocolVersion, trackUnobservedTaskException);
         }
 
         protected Task LongTestDelay()
diff --git a/Source/MQTTnet.Tests/Mockups/TestEnvironment.cs b/Source/MQTTnet.Tests/Mockups/TestEnvironment.cs
index 4f1391f15..6f2d8ae73 100644
--- a/Source/MQTTnet.Tests/Mockups/TestEnvironment.cs
+++ b/Source/MQTTnet.Tests/Mockups/TestEnvironment.cs
@@ -32,12 +32,16 @@ public TestEnvironment() : this(null)
         {
         }
 
-        public TestEnvironment(TestContext testContext, MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311)
+        public TestEnvironment(
+            TestContext testContext, MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311, bool trackUnobservedTaskException = true)
         {
             _protocolVersion = protocolVersion;
             TestContext = testContext;
 
-            TaskScheduler.UnobservedTaskException += TrackUnobservedTaskException;
+            if (trackUnobservedTaskException)
+            {
+                TaskScheduler.UnobservedTaskException += TrackUnobservedTaskException;
+            }
 
             ServerLogger.LogMessagePublished += (s, e) =>
             {
diff --git a/Source/MQTTnet.Tests/Server/Injection_Tests.cs b/Source/MQTTnet.Tests/Server/Injection_Tests.cs
index cefbc34dd..85ac53414 100644
--- a/Source/MQTTnet.Tests/Server/Injection_Tests.cs
+++ b/Source/MQTTnet.Tests/Server/Injection_Tests.cs
@@ -1,7 +1,11 @@
+using System;
 using System.Threading.Tasks;
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 using MQTTnet.Internal;
+using MQTTnet.Packets;
+using MQTTnet.Protocol;
 using MQTTnet.Server;
+using MQTTnet.Server.Exceptions;
 
 namespace MQTTnet.Tests.Server
 {
@@ -9,79 +13,443 @@ namespace MQTTnet.Tests.Server
     public sealed class Injection_Tests : BaseTestClass
     {
         [TestMethod]
-        public async Task Inject_Application_Message_At_Session_Level()
+        public async Task Enqueue_Application_Message_At_Session_Level()
         {
-            using (var testEnvironment = CreateTestEnvironment())
+            using var testEnvironment = CreateTestEnvironment();
+
+            var server = await testEnvironment.StartServer();
+            var receiver1 = await testEnvironment.ConnectClient();
+            var receiver2 = await testEnvironment.ConnectClient();
+            var messageReceivedHandler1 = testEnvironment.CreateApplicationMessageHandler(receiver1);
+            var messageReceivedHandler2 = testEnvironment.CreateApplicationMessageHandler(receiver2);
+
+            var status = await server.GetSessionsAsync();
+            var clientStatus = status[0];
+
+            await receiver1.SubscribeAsync("#");
+            await receiver2.SubscribeAsync("#");
+
+            var message = new MqttApplicationMessageBuilder()
+                .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
+                .WithTopic("InjectedOne").Build();
+
+            var enqueued = clientStatus.TryEnqueueApplicationMessage(message, out var injectResult);
+
+            Assert.IsTrue(enqueued);
+
+            await LongTestDelay();
+
+            Assert.AreEqual(1, messageReceivedHandler1.ReceivedEventArgs.Count);
+            Assert.AreEqual(injectResult.PacketIdentifier, messageReceivedHandler1.ReceivedEventArgs[0].PacketIdentifier);
+            Assert.AreEqual("InjectedOne", messageReceivedHandler1.ReceivedEventArgs[0].ApplicationMessage.Topic);
+
+            // The second receiver should NOT receive the message.
+            Assert.AreEqual(0, messageReceivedHandler2.ReceivedEventArgs.Count);
+        }
+
+        [TestMethod]
+        public async Task Enqueue_Application_Message_At_Session_Level_QueueOverflow_DropNewMessageStrategy()
+        {
+            using var testEnvironment = CreateTestEnvironment(trackUnobservedTaskException: false);
+
+            var server = await testEnvironment.StartServer(
+                builder => builder
+                    .WithMaxPendingMessagesPerClient(1)
+                    .WithPendingMessagesOverflowStrategy(MqttPendingMessagesOverflowStrategy.DropNewMessage));
+
+            var receiver = await testEnvironment.ConnectClient();
+
+            var firstMessageOutboundPacketInterceptedTcs = new TaskCompletionSource();
+            server.InterceptingOutboundPacketAsync += async args =>
             {
-                var server = await testEnvironment.StartServer();
-                var receiver1 = await testEnvironment.ConnectClient();
-                var receiver2 = await testEnvironment.ConnectClient();
-                var messageReceivedHandler1 = testEnvironment.CreateApplicationMessageHandler(receiver1);
-                var messageReceivedHandler2 = testEnvironment.CreateApplicationMessageHandler(receiver2);
+                // - The first message is dequeued normally and calls this delay
+                // - The second message fills the outbound queue
+                // - The third message overflows the outbound queue
+                if (args.Packet is MqttPublishPacket)
+                {
+                    firstMessageOutboundPacketInterceptedTcs.SetResult();
+                    await Task.Delay(TimeSpan.FromDays(1), args.CancellationToken);
+                }
+            };
 
-                var status = await server.GetSessionsAsync();
-                var clientStatus = status[0];
+            var firstMessageEvicted = false;
+            var secondMessageEvicted = false;
+            var thirdMessageEvicted = false;
 
-                await receiver1.SubscribeAsync("#");
-                await receiver2.SubscribeAsync("#");
+            server.QueuedApplicationMessageOverwrittenAsync += args =>
+            {
+                if (args.Packet is not MqttPublishPacket publishPacket)
+                {
+                    return Task.CompletedTask;
+                }
+
+                switch (publishPacket.Topic)
+                {
+                    case "InjectedOne":
+                        firstMessageEvicted = true;
+                        break;
+                    case "InjectedTwo":
+                        secondMessageEvicted = true;
+                        break;
+                    case "InjectedThree":
+                        thirdMessageEvicted = true;
+                        break;
+                }
 
-                await clientStatus.EnqueueApplicationMessageAsync(new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build());
+                return Task.CompletedTask;
+            };
 
-                await LongTestDelay();
+            var status = await server.GetSessionsAsync();
+            var clientStatus = status[0];
+            await receiver.SubscribeAsync("#");
 
-                Assert.AreEqual(1, messageReceivedHandler1.ReceivedEventArgs.Count);
-                Assert.AreEqual("InjectedOne", messageReceivedHandler1.ReceivedEventArgs[0].ApplicationMessage.Topic);
+            var firstMessageEnqueued = clientStatus.TryEnqueueApplicationMessage(
+                new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build(), out _);
+            await firstMessageOutboundPacketInterceptedTcs.Task;
 
-                // The second receiver should NOT receive the message.
-                Assert.AreEqual(0, messageReceivedHandler2.ReceivedEventArgs.Count);
-            }
+            var secondMessageEnqueued = clientStatus.TryEnqueueApplicationMessage(
+                new MqttApplicationMessageBuilder().WithTopic("InjectedTwo").Build(), out _);
+
+            var thirdMessageEnqueued = clientStatus.TryEnqueueApplicationMessage(
+                new MqttApplicationMessageBuilder().WithTopic("InjectedThree").Build(), out _);
+
+            // Due to the DropNewMessage strategy the third message will not be enqueued.
+            // As a result, no existing messages in the queue will be dropped (evicted).
+            Assert.IsTrue(firstMessageEnqueued);
+            Assert.IsTrue(secondMessageEnqueued);
+            Assert.IsFalse(thirdMessageEnqueued);
+
+            Assert.IsFalse(firstMessageEvicted);
+            Assert.IsFalse(secondMessageEvicted);
+            Assert.IsFalse(thirdMessageEvicted);
         }
 
+
         [TestMethod]
-        public async Task Inject_ApplicationMessage_At_Server_Level()
+        public async Task Enqueue_Application_Message_At_Session_Level_QueueOverflow_DropOldestQueuedMessageStrategy()
         {
-            using (var testEnvironment = CreateTestEnvironment())
+            using var testEnvironment = CreateTestEnvironment(trackUnobservedTaskException: false);
+
+            var server = await testEnvironment.StartServer(
+                builder => builder
+                    .WithMaxPendingMessagesPerClient(1)
+                    .WithPendingMessagesOverflowStrategy(MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage));
+
+            var receiver = await testEnvironment.ConnectClient();
+
+            var firstMessageOutboundPacketInterceptedTcs = new TaskCompletionSource();
+            server.InterceptingOutboundPacketAsync += async args =>
             {
-                var server = await testEnvironment.StartServer();
+                // - The first message is dequeued normally and calls this delay
+                // - The second message fills the outbound queue
+                // - The third message overflows the outbound queue
+                if (args.Packet is MqttPublishPacket)
+                {
+                    firstMessageOutboundPacketInterceptedTcs.SetResult();
+                    await Task.Delay(TimeSpan.FromDays(1), args.CancellationToken);
+                }
+            };
+
+            var firstMessageEvicted = false;
+            var secondMessageEvicted = false;
+            var thirdMessageEvicted = false;
+
+            server.QueuedApplicationMessageOverwrittenAsync += args =>
+            {
+                if (args.Packet is not MqttPublishPacket publishPacket)
+                {
+                    return Task.CompletedTask;
+                }
 
-                var receiver = await testEnvironment.ConnectClient();
+                switch (publishPacket.Topic)
+                {
+                    case "InjectedOne":
+                        firstMessageEvicted = true;
+                        break;
+                    case "InjectedTwo":
+                        secondMessageEvicted = true;
+                        break;
+                    case "InjectedThree":
+                        thirdMessageEvicted = true;
+                        break;
+                }
+
+                return Task.CompletedTask;
+            };
 
-                var messageReceivedHandler = testEnvironment.CreateApplicationMessageHandler(receiver);
+            var status = await server.GetSessionsAsync();
+            var clientStatus = status[0];
+            await receiver.SubscribeAsync("#");
 
-                await receiver.SubscribeAsync("#");
+            var firstMessageEnqueued = clientStatus.TryEnqueueApplicationMessage(
+                new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build(), out _);
+            await firstMessageOutboundPacketInterceptedTcs.Task;
 
-                var injectedApplicationMessage = new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build();
+            var secondMessageEnqueued = clientStatus.TryEnqueueApplicationMessage(
+                new MqttApplicationMessageBuilder().WithTopic("InjectedTwo").Build(), out _);
 
-                await server.InjectApplicationMessage(new InjectedMqttApplicationMessage(injectedApplicationMessage));
+            var thirdMessageEnqueued = clientStatus.TryEnqueueApplicationMessage(
+                new MqttApplicationMessageBuilder().WithTopic("InjectedThree").Build(), out _);
 
-                await LongTestDelay();
+            // Due to the DropOldestQueuedMessage strategy, all messages will be enqueued initially.
+            // But the second message will eventually be dropped (evicted) to make room for the third one.
+            Assert.IsTrue(firstMessageEnqueued);
+            Assert.IsTrue(secondMessageEnqueued);
+            Assert.IsTrue(thirdMessageEnqueued);
 
-                Assert.AreEqual(1, messageReceivedHandler.ReceivedEventArgs.Count);
-                Assert.AreEqual("InjectedOne", messageReceivedHandler.ReceivedEventArgs[0].ApplicationMessage.Topic);
-            }
+            Assert.IsFalse(firstMessageEvicted);
+            Assert.IsTrue(secondMessageEvicted);
+            Assert.IsFalse(thirdMessageEvicted);
         }
 
         [TestMethod]
-        public async Task Intercept_Injected_Application_Message()
+        public async Task Deliver_Application_Message_At_Session_Level()
+        {
+            using var testEnvironment = CreateTestEnvironment();
+
+            var server = await testEnvironment.StartServer();
+            var receiver1 = await testEnvironment.ConnectClient();
+            var receiver2 = await testEnvironment.ConnectClient();
+            var messageReceivedHandler1 = testEnvironment.CreateApplicationMessageHandler(receiver1);
+            var messageReceivedHandler2 = testEnvironment.CreateApplicationMessageHandler(receiver2);
+
+            var status = await server.GetSessionsAsync();
+            var clientStatus = status[0];
+
+            await receiver1.SubscribeAsync("#");
+            await receiver2.SubscribeAsync("#");
+
+            var mqttApplicationMessage = new MqttApplicationMessageBuilder()
+                .WithTopic("InjectedOne")
+                .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
+                .Build();
+            var injectResult = await clientStatus.DeliverApplicationMessageAsync(mqttApplicationMessage);
+
+            await LongTestDelay();
+
+            Assert.AreEqual(1, messageReceivedHandler1.ReceivedEventArgs.Count);
+            Assert.AreEqual(injectResult.PacketIdentifier, messageReceivedHandler1.ReceivedEventArgs[0].PacketIdentifier);
+            Assert.AreEqual("InjectedOne", messageReceivedHandler1.ReceivedEventArgs[0].ApplicationMessage.Topic);
+
+            // The second receiver should NOT receive the message.
+            Assert.AreEqual(0, messageReceivedHandler2.ReceivedEventArgs.Count);
+        }
+
+        [TestMethod]
+        public async Task Deliver_Application_Message_At_Session_Level_QueueOverflow_DropNewMessageStrategy()
+        {
+            using var testEnvironment = CreateTestEnvironment();
+
+            var server = await testEnvironment.StartServer(
+                builder => builder
+                    .WithMaxPendingMessagesPerClient(1)
+                    .WithPendingMessagesOverflowStrategy(MqttPendingMessagesOverflowStrategy.DropNewMessage));
+
+            var receiver = await testEnvironment.ConnectClient();
+
+            var firstMessageOutboundPacketInterceptedTcs = new TaskCompletionSource();
+            server.InterceptingOutboundPacketAsync += async args =>
+            {
+                // - The first message is dequeued normally and calls this delay
+                // - The second message fills the outbound queue
+                // - The third message overflows the outbound queue
+                if (args.Packet is MqttPublishPacket)
+                {
+                    firstMessageOutboundPacketInterceptedTcs.SetResult();
+                    await Task.Delay(TimeSpan.FromDays(1), args.CancellationToken);
+                }
+            };
+
+            var firstMessageEvicted = false;
+            var secondMessageEvicted = false;
+            var thirdMessageEvicted = false;
+
+            server.QueuedApplicationMessageOverwrittenAsync += args =>
+            {
+                if (args.Packet is not MqttPublishPacket publishPacket)
+                {
+                    return Task.CompletedTask;
+                }
+
+                switch (publishPacket.Topic)
+                {
+                    case "InjectedOne":
+                        firstMessageEvicted = true;
+                        break;
+                    case "InjectedTwo":
+                        secondMessageEvicted = true;
+                        break;
+                    case "InjectedThree":
+                        thirdMessageEvicted = true;
+                        break;
+                }
+
+                return Task.CompletedTask;
+            };
+
+            var status = await server.GetSessionsAsync();
+            var clientStatus = status[0];
+            await receiver.SubscribeAsync("#");
+
+            var firstMessageTask = Task.Run(
+                () => clientStatus.DeliverApplicationMessageAsync(
+                    new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build()));
+            await LongTestDelay();
+            await firstMessageOutboundPacketInterceptedTcs.Task;
+
+            var secondMessageTask = Task.Run(
+                () => clientStatus.DeliverApplicationMessageAsync(
+                    new MqttApplicationMessageBuilder().WithTopic("InjectedTwo").Build()));
+            await LongTestDelay();
+
+            var thirdMessageTask = Task.Run(
+                () => clientStatus.DeliverApplicationMessageAsync(
+                    new MqttApplicationMessageBuilder().WithTopic("InjectedThree").Build()));
+            await LongTestDelay();
+
+            Task.WaitAny(firstMessageTask, secondMessageTask, thirdMessageTask);
+
+            // Due to the DropNewMessage strategy the third message delivery will fail.
+            // As a result, no existing messages in the queue will be dropped (evicted).
+            Assert.AreEqual(firstMessageTask.Status, TaskStatus.WaitingForActivation);
+            Assert.AreEqual(secondMessageTask.Status, TaskStatus.WaitingForActivation);
+            Assert.AreEqual(thirdMessageTask.Status, TaskStatus.Faulted);
+            Assert.IsTrue(thirdMessageTask.Exception?.InnerException is MqttPendingMessagesOverflowException);
+
+            Assert.IsFalse(firstMessageEvicted);
+            Assert.IsFalse(secondMessageEvicted);
+            Assert.IsFalse(thirdMessageEvicted);
+        }
+
+        [TestMethod]
+        public async Task Deliver_Application_Message_At_Session_Level_QueueOverflow_DropOldestQueuedMessageStrategy()
         {
-            using (var testEnvironment = CreateTestEnvironment())
+            using var testEnvironment = CreateTestEnvironment(trackUnobservedTaskException: false);
+
+            var server = await testEnvironment.StartServer(
+                builder => builder
+                    .WithMaxPendingMessagesPerClient(1)
+                    .WithPendingMessagesOverflowStrategy(MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage));
+
+            var receiver = await testEnvironment.ConnectClient();
+
+            var firstMessageOutboundPacketInterceptedTcs = new TaskCompletionSource();
+            server.InterceptingOutboundPacketAsync += async args =>
             {
-                var server = await testEnvironment.StartServer();
+                // - The first message is dequeued normally and calls this delay
+                // - The second message fills the outbound queue
+                // - The third message overflows the outbound queue
+                if (args.Packet is MqttPublishPacket)
+                {
+                    firstMessageOutboundPacketInterceptedTcs.SetResult();
+                    await Task.Delay(TimeSpan.FromDays(1), args.CancellationToken);
+                }
+            };
 
-                MqttApplicationMessage interceptedMessage = null;
-                server.InterceptingPublishAsync += eventArgs =>
+            var firstMessageEvicted = false;
+            var secondMessageEvicted = false;
+            var thirdMessageEvicted = false;
+
+            server.QueuedApplicationMessageOverwrittenAsync += args =>
+            {
+                if (args.Packet is not MqttPublishPacket publishPacket)
+                {
+                    return Task.CompletedTask;
+                }
+
+                switch (publishPacket.Topic)
                 {
-                    interceptedMessage = eventArgs.ApplicationMessage;
-                    return CompletedTask.Instance;
-                };
+                    case "InjectedOne":
+                        firstMessageEvicted = true;
+                        break;
+                    case "InjectedTwo":
+                        secondMessageEvicted = true;
+                        break;
+                    case "InjectedThree":
+                        thirdMessageEvicted = true;
+                        break;
+                }
+
+                return Task.CompletedTask;
+            };
+
+            var status = await server.GetSessionsAsync();
+            var clientStatus = status[0];
+            await receiver.SubscribeAsync("#");
+
+            var firstMessageTask = Task.Run(
+                () => clientStatus.DeliverApplicationMessageAsync(
+                    new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build()));
+            await LongTestDelay();
+            await firstMessageOutboundPacketInterceptedTcs.Task;
+
+            var secondMessageTask = Task.Run(
+                () => clientStatus.DeliverApplicationMessageAsync(
+                    new MqttApplicationMessageBuilder().WithTopic("InjectedTwo").Build()));
+            await LongTestDelay();
+
+            var thirdMessageTask = Task.Run(
+                () => clientStatus.DeliverApplicationMessageAsync(
+                    new MqttApplicationMessageBuilder().WithTopic("InjectedThree").Build()));
+            await LongTestDelay();
+
+            Task.WaitAny(firstMessageTask, secondMessageTask, thirdMessageTask);
+
+            // Due to the DropOldestQueuedMessage strategy, the second message delivery will fail
+            // to make room for the third one.
+            Assert.AreEqual(firstMessageTask.Status, TaskStatus.WaitingForActivation);
+            Assert.AreEqual(secondMessageTask.Status, TaskStatus.Faulted);
+            Assert.IsTrue(secondMessageTask.Exception?.InnerException is MqttPendingMessagesOverflowException);
+            Assert.AreEqual(thirdMessageTask.Status, TaskStatus.WaitingForActivation);
+
+            Assert.IsFalse(firstMessageEvicted);
+            Assert.IsTrue(secondMessageEvicted);
+            Assert.IsFalse(thirdMessageEvicted);
+        }
+
+        [TestMethod]
+        public async Task Inject_ApplicationMessage_At_Server_Level()
+        {
+            using var testEnvironment = CreateTestEnvironment();
+
+            var server = await testEnvironment.StartServer();
+
+            var receiver = await testEnvironment.ConnectClient();
+
+            var messageReceivedHandler = testEnvironment.CreateApplicationMessageHandler(receiver);
+
+            await receiver.SubscribeAsync("#");
+
+            var injectedApplicationMessage = new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build();
+
+            await server.InjectApplicationMessage(new InjectedMqttApplicationMessage(injectedApplicationMessage));
+
+            await LongTestDelay();
+
+            Assert.AreEqual(1, messageReceivedHandler.ReceivedEventArgs.Count);
+            Assert.AreEqual("InjectedOne", messageReceivedHandler.ReceivedEventArgs[0].ApplicationMessage.Topic);
+        }
+
+        [TestMethod]
+        public async Task Intercept_Injected_Application_Message()
+        {
+            using var testEnvironment = CreateTestEnvironment();
+
+            var server = await testEnvironment.StartServer();
+
+            MqttApplicationMessage interceptedMessage = null;
+            server.InterceptingPublishAsync += eventArgs =>
+            {
+                interceptedMessage = eventArgs.ApplicationMessage;
+                return CompletedTask.Instance;
+            };
 
-                var injectedApplicationMessage = new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build();
-                await server.InjectApplicationMessage(new InjectedMqttApplicationMessage(injectedApplicationMessage));
+            var injectedApplicationMessage = new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build();
+            await server.InjectApplicationMessage(new InjectedMqttApplicationMessage(injectedApplicationMessage));
 
-                await LongTestDelay();
+            await LongTestDelay();
 
-                Assert.IsNotNull(interceptedMessage);
-            }
+            Assert.IsNotNull(interceptedMessage);
         }
     }
 }
\ No newline at end of file
diff --git a/Source/MQTTnet/InjectMqttApplicationMessageResult.cs b/Source/MQTTnet/InjectMqttApplicationMessageResult.cs
new file mode 100644
index 000000000..b2e6da802
--- /dev/null
+++ b/Source/MQTTnet/InjectMqttApplicationMessageResult.cs
@@ -0,0 +1,10 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+namespace MQTTnet;
+
+public class InjectMqttApplicationMessageResult
+{
+    public ushort PacketIdentifier { get; init; }
+}
\ No newline at end of file
diff --git a/Source/MQTTnet/Internal/MqttPacketBus.cs b/Source/MQTTnet/Internal/MqttPacketBus.cs
index 8aec37565..46e2b3493 100644
--- a/Source/MQTTnet/Internal/MqttPacketBus.cs
+++ b/Source/MQTTnet/Internal/MqttPacketBus.cs
@@ -141,14 +141,6 @@ public List<MqttPacket> ExportPackets(MqttPacketBusPartition partition)
             }
         }
 
-        public int ItemsCount(MqttPacketBusPartition partition)
-        {
-            lock (_syncRoot)
-            {
-                return _partitions[(int)partition].Count;
-            }
-        }
-
         public int PartitionItemsCount(MqttPacketBusPartition partition)
         {
             lock (_syncRoot)
diff --git a/Source/MQTTnet/Internal/MqttPacketBusItem.cs b/Source/MQTTnet/Internal/MqttPacketBusItem.cs
index b94654cf5..6fb5b114d 100644
--- a/Source/MQTTnet/Internal/MqttPacketBusItem.cs
+++ b/Source/MQTTnet/Internal/MqttPacketBusItem.cs
@@ -10,8 +10,8 @@ namespace MQTTnet.Internal
 {
     public sealed class MqttPacketBusItem
     {
-        readonly AsyncTaskCompletionSource<bool> _promise = new AsyncTaskCompletionSource<bool>();
-        
+        readonly AsyncTaskCompletionSource<MqttPacket> _promise = new AsyncTaskCompletionSource<MqttPacket>();
+
         public MqttPacketBusItem(MqttPacket packet)
         {
             Packet = packet ?? throw new ArgumentNullException(nameof(packet));
@@ -28,7 +28,7 @@ public void Cancel()
 
         public void Complete()
         {
-            _promise.TrySetResult(true);
+            _promise.TrySetResult(Packet);
             Completed?.Invoke(this, EventArgs.Empty);
         }
 
@@ -37,7 +37,7 @@ public void Fail(Exception exception)
             _promise.TrySetException(exception);
         }
 
-        public Task WaitAsync()
+        public Task<MqttPacket> WaitAsync()
         {
             return _promise.Task;
         }
diff --git a/Source/ReleaseNotes.md b/Source/ReleaseNotes.md
index 7c70587e3..e68ff4c39 100644
--- a/Source/ReleaseNotes.md
+++ b/Source/ReleaseNotes.md
@@ -9,6 +9,7 @@
 * Removal of Managed Client **(BREAKING CHANGE)**
 * Fixed missing release notes in nuget packages.
 
+
 * Client: MQTT 5.0.0 is now the default version when connecting with a server **(BREAKING CHANGE)**
 * Client: Fixed enhanced authentication.
 * Client: Exposed WebSocket compression options in MQTT client options (thanks to @victornor, #2127)
@@ -19,3 +20,4 @@
 * Server: Set SSL version to "None" which will let the OS choose the version **(BREAKING CHANGE)**
 * Server: Added API for getting a single session (thanks to @AntonSmolkov, #2131)
 * Server: Fixed "TryPrivate" (Mosquitto feature) handling (thanks to @victornor, #2125) **(BREAKING CHANGE)**
+* Server: Fixed dead lock when awaiting a packet transmission but the packet gets dropped due to quotas (#2117, thanks to @AntonSmolkov)