diff --git a/changelog.d/2590.fixed.md b/changelog.d/2590.fixed.md new file mode 100644 index 00000000000..a3fc1cf2747 --- /dev/null +++ b/changelog.d/2590.fixed.md @@ -0,0 +1 @@ +Fix empty request streaming hanging forever \ No newline at end of file diff --git a/mirrord/agent/src/steal/connection.rs b/mirrord/agent/src/steal/connection.rs index cdb74979737..f24d7736a4e 100644 --- a/mirrord/agent/src/steal/connection.rs +++ b/mirrord/agent/src/steal/connection.rs @@ -27,6 +27,7 @@ use tokio::{ sync::mpsc::{Receiver, Sender}, }; use tokio_util::sync::CancellationToken; +use tracing::warn; use crate::{ error::{AgentError, Result}, @@ -154,6 +155,7 @@ impl Client { let tx = self.tx.clone(); tokio::spawn(async move { + tracing::trace!(?request.connection_id, ?request.request_id, ?chunked, ?framed, "starting request"); // Chunked data is preferred over framed data if chunked { // Send headers @@ -171,7 +173,9 @@ impl Client { ) = request.request.into_parts(); match body.next_frames(true).await { Err(..) => return, - Ok(Frames { frames, is_last }) => { + // We don't check is_last here since loop will finish when body.next_frames() + // returns None + Ok(Frames { frames, .. }) => { let frames = frames .into_iter() .map(InternalHttpBodyFrame::try_from) @@ -190,7 +194,9 @@ impl Client { request_id, port: request.port, })); - if tx.send(message).await.is_err() || is_last { + + if let Err(e) = tx.send(message).await { + warn!(?e, ?connection_id, ?request_id, ?request.port, "failed to send chunked request start"); return; } } @@ -212,7 +218,13 @@ impl Client { request_id, }, )); - if tx.send(message).await.is_err() || is_last { + + if let Err(e) = tx.send(message).await { + warn!(?e, ?connection_id, ?request_id, ?request.port, "failed to send chunked request body"); + return; + } + + if is_last { return; } }