From 04134383511ccad16e165cbe0f3b188268b11244 Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Sun, 8 Dec 2024 10:00:51 +0100 Subject: [PATCH] Make sure corresponding Connection and Channel token properly float into handlers --- projects/RabbitMQ.Client/Impl/Channel.cs | 6 +++--- .../RabbitMQ.Client/Impl/Connection.Receive.cs | 15 ++++++++++----- projects/RabbitMQ.Client/Impl/Connection.cs | 4 ++-- 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/projects/RabbitMQ.Client/Impl/Channel.cs b/projects/RabbitMQ.Client/Impl/Channel.cs index 64be4152e..51c8f0ec5 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.cs @@ -196,7 +196,7 @@ protected void TakeOver(Channel other) public Task CloseAsync(ushort replyCode, string replyText, bool abort, CancellationToken cancellationToken) { - var args = new ShutdownEventArgs(ShutdownInitiator.Application, replyCode, replyText); + var args = new ShutdownEventArgs(ShutdownInitiator.Application, replyCode, replyText, cancellationToken: cancellationToken); return CloseAsync(args, abort, cancellationToken); } @@ -725,7 +725,7 @@ await Session.Connection.HandleConnectionBlockedAsync(reason, cancellationToken) protected async Task HandleConnectionCloseAsync(IncomingCommand cmd, CancellationToken cancellationToken) { var method = new ConnectionClose(cmd.MethodSpan); - var reason = new ShutdownEventArgs(ShutdownInitiator.Peer, method._replyCode, method._replyText, method._classId, method._methodId); + var reason = new ShutdownEventArgs(ShutdownInitiator.Peer, method._replyCode, method._replyText, method._classId, method._methodId, cancellationToken: cancellationToken); try { await Session.Connection.ClosedViaPeerAsync(reason, cancellationToken) @@ -763,7 +763,7 @@ protected async Task HandleConnectionStartAsync(IncomingCommand cmd, Cance { if (m_connectionStartCell is null) { - var reason = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.CommandInvalid, "Unexpected Connection.Start"); + var reason = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.CommandInvalid, "Unexpected Connection.Start", cancellationToken: cancellationToken); await Session.Connection.CloseAsync(reason, false, InternalConstants.DefaultConnectionCloseTimeout, cancellationToken) diff --git a/projects/RabbitMQ.Client/Impl/Connection.Receive.cs b/projects/RabbitMQ.Client/Impl/Connection.Receive.cs index 9a3ad49b1..07951d144 100644 --- a/projects/RabbitMQ.Client/Impl/Connection.Receive.cs +++ b/projects/RabbitMQ.Client/Impl/Connection.Receive.cs @@ -62,7 +62,8 @@ await ReceiveLoopAsync(mainLoopToken) var ea = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.InternalError, "Thread aborted (AppDomain unloaded?)", - exception: taex); + exception: taex, + cancellationToken: mainLoopToken); await HandleMainLoopExceptionAsync(ea) .ConfigureAwait(false); } @@ -73,7 +74,8 @@ await HandleMainLoopExceptionAsync(ea) var ea = new ShutdownEventArgs(ShutdownInitiator.Library, 0, "End of stream", - exception: eose); + exception: eose, + cancellationToken: mainLoopToken); await HandleMainLoopExceptionAsync(ea) .ConfigureAwait(false); } @@ -91,7 +93,8 @@ await HardProtocolExceptionHandlerAsync(hpe, mainLoopToken) var ea = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.InternalError, fileLoadException.Message, - exception: fileLoadException); + exception: fileLoadException, + cancellationToken: mainLoopToken); await HandleMainLoopExceptionAsync(ea) .ConfigureAwait(false); } @@ -106,7 +109,8 @@ await HandleMainLoopExceptionAsync(ea) var ea = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.InternalError, ocex.Message, - exception: ocex); + exception: ocex, + cancellationToken: mainLoopToken); await HandleMainLoopExceptionAsync(ea) .ConfigureAwait(false); } @@ -116,7 +120,8 @@ await HandleMainLoopExceptionAsync(ea) var ea = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.InternalError, ex.Message, - exception: ex); + exception: ex, + cancellationToken: mainLoopToken); await HandleMainLoopExceptionAsync(ea) .ConfigureAwait(false); } diff --git a/projects/RabbitMQ.Client/Impl/Connection.cs b/projects/RabbitMQ.Client/Impl/Connection.cs index af26bb7ac..6444c7997 100644 --- a/projects/RabbitMQ.Client/Impl/Connection.cs +++ b/projects/RabbitMQ.Client/Impl/Connection.cs @@ -252,7 +252,7 @@ await _channel0.ConnectionOpenAsync(_config.VirtualHost, cancellationToken) { try { - var ea = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.InternalError, "FailedOpen"); + var ea = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.InternalError, "FailedOpen", cancellationToken: cancellationToken); await CloseAsync(ea, true, InternalConstants.DefaultConnectionAbortTimeout, cancellationToken).ConfigureAwait(false); @@ -297,7 +297,7 @@ internal void EnsureIsOpen() public Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abort, CancellationToken cancellationToken = default) { - var reason = new ShutdownEventArgs(ShutdownInitiator.Application, reasonCode, reasonText); + var reason = new ShutdownEventArgs(ShutdownInitiator.Application, reasonCode, reasonText, cancellationToken: cancellationToken); return CloseAsync(reason, abort, timeout, cancellationToken); }