From 5c0d04a06b47fcf356ff2f59d1167aa962ecbc47 Mon Sep 17 00:00:00 2001 From: Aviram Hassan Date: Fri, 12 Jul 2024 09:12:03 +0300 Subject: [PATCH 1/4] Fix empty request streaming hanging forever --- mirrord/agent/src/steal/connection.rs | 28 +++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/mirrord/agent/src/steal/connection.rs b/mirrord/agent/src/steal/connection.rs index cdb74979737..0de0f2fca72 100644 --- a/mirrord/agent/src/steal/connection.rs +++ b/mirrord/agent/src/steal/connection.rs @@ -27,6 +27,7 @@ use tokio::{ sync::mpsc::{Receiver, Sender}, }; use tokio_util::sync::CancellationToken; +use tracing::warn; use crate::{ error::{AgentError, Result}, @@ -169,6 +170,7 @@ impl Client { }, mut body, ) = request.request.into_parts(); + tracing::trace!(?connection_id, "starting request"); match body.next_frames(true).await { Err(..) => return, Ok(Frames { frames, is_last }) => { @@ -190,9 +192,25 @@ impl Client { request_id, port: request.port, })); - if tx.send(message).await.is_err() || is_last { + + if let Err(e) = tx.send(message).await { + warn!(?e, ?connection_id, ?request_id, ?request.port, "failed to send chunked request start"); return; } + if is_last { + let message = DaemonTcp::HttpRequestChunked(ChunkedRequest::Body( + ChunkedHttpBody { + frames: Vec::new(), + is_last, + connection_id, + request_id, + }, + )); + if let Err(e) = tx.send(message).await { + warn!(?e, ?connection_id, ?request_id, ?request.port, "failed to send chunked request empty body"); + return; + } + } } } @@ -212,7 +230,13 @@ impl Client { request_id, }, )); - if tx.send(message).await.is_err() || is_last { + + if let Err(e) = tx.send(message).await { + warn!(?e, ?connection_id, ?request_id, ?request.port, "failed to send chunked request body"); + return; + } + + if is_last { return; } } From ddb5a56968dff1f18ae2e84747ce7fccffa80538 Mon Sep 17 00:00:00 2001 From: Aviram Hassan Date: Fri, 12 Jul 2024 09:14:57 +0300 Subject: [PATCH 2/4] changelog --- changelog.d/2590.fixed.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/2590.fixed.md diff --git a/changelog.d/2590.fixed.md b/changelog.d/2590.fixed.md new file mode 100644 index 00000000000..a3fc1cf2747 --- /dev/null +++ b/changelog.d/2590.fixed.md @@ -0,0 +1 @@ +Fix empty request streaming hanging forever \ No newline at end of file From fb082994ff3658d4bc28f7e46472a2e19e93f85c Mon Sep 17 00:00:00 2001 From: Aviram Hassan Date: Fri, 12 Jul 2024 09:19:02 +0300 Subject: [PATCH 3/4] cr --- mirrord/agent/src/steal/connection.rs | 20 ++++---------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/mirrord/agent/src/steal/connection.rs b/mirrord/agent/src/steal/connection.rs index 0de0f2fca72..10a3b0605f3 100644 --- a/mirrord/agent/src/steal/connection.rs +++ b/mirrord/agent/src/steal/connection.rs @@ -155,6 +155,7 @@ impl Client { let tx = self.tx.clone(); tokio::spawn(async move { + tracing::trace!(?connection_id, ?chunked, ?framed, "starting request"); // Chunked data is preferred over framed data if chunked { // Send headers @@ -170,10 +171,11 @@ impl Client { }, mut body, ) = request.request.into_parts(); - tracing::trace!(?connection_id, "starting request"); match body.next_frames(true).await { Err(..) => return, - Ok(Frames { frames, is_last }) => { + // We don't check is_last here since loop will finish when body.next_frames() + // returns None + Ok(Frames { frames, .. }) => { let frames = frames .into_iter() .map(InternalHttpBodyFrame::try_from) @@ -197,20 +199,6 @@ impl Client { warn!(?e, ?connection_id, ?request_id, ?request.port, "failed to send chunked request start"); return; } - if is_last { - let message = DaemonTcp::HttpRequestChunked(ChunkedRequest::Body( - ChunkedHttpBody { - frames: Vec::new(), - is_last, - connection_id, - request_id, - }, - )); - if let Err(e) = tx.send(message).await { - warn!(?e, ?connection_id, ?request_id, ?request.port, "failed to send chunked request empty body"); - return; - } - } } } From cd5a9b28c4181ab6dc671c651607d0c935105d69 Mon Sep 17 00:00:00 2001 From: Aviram Hassan Date: Fri, 12 Jul 2024 09:21:07 +0300 Subject: [PATCH 4/4] cr --- mirrord/agent/src/steal/connection.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mirrord/agent/src/steal/connection.rs b/mirrord/agent/src/steal/connection.rs index 10a3b0605f3..f24d7736a4e 100644 --- a/mirrord/agent/src/steal/connection.rs +++ b/mirrord/agent/src/steal/connection.rs @@ -155,7 +155,7 @@ impl Client { let tx = self.tx.clone(); tokio::spawn(async move { - tracing::trace!(?connection_id, ?chunked, ?framed, "starting request"); + tracing::trace!(?request.connection_id, ?request.request_id, ?chunked, ?framed, "starting request"); // Chunked data is preferred over framed data if chunked { // Send headers