Skip to content

Commit

Permalink
Fix hanging requests with filtered steal (#3016)
Browse files Browse the repository at this point in the history
* Integration test

* Moved MetadataStore to a separate module

* bind_similar -> BoundTcpSocket

* BodyExt -> BatchedBody, reworked trait

* local HTTP handling rework

* Remove obsolete stuff from mirrord-protocol

* Moved HttpResponseFallback to the agent

* Moved frame senders to InterceptorHandle

* HttpResponseReaders in IncomingProxy

* Clippy

* Better tracing

* HttpResponseReader logic split into methods

* unless_bus_closed

* I hate this

* HttpGateway tests

* ClientStore test

* Changed implementation of ClientStore shared state

* Some docs

* Docs

* Removed obsolete integration test - replaced before with a unit test

* More ClientStore tracing

* Less spammy debug for InProxyTaskMessage

* Clippy and docs

* More tracing

* Clippy

* Fixed TcpProxyTask

* More IncomingProxy docs

* macos tests fixed

* Fixed reverseportforwarder and its tests

* Upgrade fixed

* Extended changelog

* Frames doc

* Helper function for BatchedBody

* auto_responder -> unwrap instead of is_err + break

* ClientStore unwrap -> expect

* Comments for ClientStore cleanup_task

* Closed doc

* TcpStealApi::response_body_tx doc

* Removed expect from client_store::cleanup_task

* Doc lint

* Update mirrord/intproxy/src/background_tasks.rs

Co-authored-by: meowjesty <[email protected]>

* Update mirrord/intproxy/src/proxies/incoming.rs

Co-authored-by: meowjesty <[email protected]>

* error -> unreachable

* pub(crate) for Closed

* Update mirrord/intproxy/src/proxies/incoming.rs

Co-authored-by: meowjesty <[email protected]>

* not war

* More doccc

* self_address -> self

* rephrased error messages

* instrument on LocalHttpClient::new

* More docs on clone for StreamingBody

* docc

* docsss

* Doc fixed

* Doc fixed

* more instrument

* in whole -> without a filter

* moar doccc

* TPC -> TCP

* added ignore to doctest

---------

Co-authored-by: meowjesty <[email protected]>
  • Loading branch information
Razz4780 and meowjesty authored Jan 21, 2025
1 parent 78f9570 commit 2ee5a2c
Show file tree
Hide file tree
Showing 31 changed files with 3,185 additions and 2,695 deletions.
6 changes: 1 addition & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions changelog.d/3013.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fixed an issue where HTTP requests stolen with a filter would hang with a single-threaded local HTTP server.
Improved handling of incoming connections on the local machine (e.g introduces reuse of local HTTP connections).
4 changes: 3 additions & 1 deletion mirrord/agent/src/steal.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use mirrord_protocol::{
tcp::{DaemonTcp, HttpResponseFallback, StealType, TcpData},
tcp::{DaemonTcp, StealType, TcpData},
ConnectionId, Port,
};
use tokio::sync::mpsc::Sender;
Expand All @@ -17,6 +17,8 @@ mod subscriptions;
pub(crate) use api::TcpStealerApi;
pub(crate) use connection::TcpConnectionStealer;

use self::http::HttpResponseFallback;

/// Commands from the agent that are passed down to the stealer worker, through [`TcpStealerApi`].
///
/// These are the operations that the agent receives from the layer to make the _steal_ feature
Expand Down
23 changes: 15 additions & 8 deletions mirrord/agent/src/steal/api.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
use std::collections::HashMap;
use std::{collections::HashMap, convert::Infallible};

use bytes::Bytes;
use hyper::body::Frame;
use mirrord_protocol::{
tcp::{
ChunkedResponse, DaemonTcp, HttpResponse, HttpResponseFallback, InternalHttpResponse,
LayerTcpSteal, ReceiverStreamBody, TcpData,
},
tcp::{ChunkedResponse, DaemonTcp, HttpResponse, InternalHttpResponse, LayerTcpSteal, TcpData},
RequestId,
};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio_stream::wrappers::ReceiverStream;

use super::*;
use super::{http::ReceiverStreamBody, *};
use crate::{
error::{AgentError, Result},
util::ClientId,
watched_task::TaskStatus,
};

type ResponseBodyTx = Sender<Result<Frame<Bytes>, Infallible>>;

/// Bridges the communication between the agent and the [`TcpConnectionStealer`] task.
/// There is an API instance for each connected layer ("client"). All API instances send commands
/// On the same stealer command channel, where the layer-independent stealer listens to them.
Expand All @@ -40,7 +39,15 @@ pub(crate) struct TcpStealerApi {
/// View on the stealer task's status.
task_status: TaskStatus,

response_body_txs: HashMap<(ConnectionId, RequestId), Sender<hyper::Result<Frame<Bytes>>>>,
/// [`Sender`]s that allow us to provide body [`Frame`]s of responses to filtered HTTP
/// requests.
///
/// With [`LayerTcpSteal::HttpResponseChunked`], response bodies come from the client
/// in a series of [`ChunkedResponse::Body`] messages.
///
/// Thus, we use [`ReceiverStreamBody`] for [`Response`](hyper::Response)'s body type and
/// pipe the [`Frame`]s through an [`mpsc::channel`].
response_body_txs: HashMap<(ConnectionId, RequestId), ResponseBodyTx>,
}

impl TcpStealerApi {
Expand Down Expand Up @@ -196,7 +203,7 @@ impl TcpStealerApi {
let key = (response.connection_id, response.request_id);
self.response_body_txs.insert(key, tx.clone());

self.http_response(HttpResponseFallback::Streamed(http_response, None))
self.http_response(HttpResponseFallback::Streamed(http_response))
.await?;

for frame in response.internal_response.body {
Expand Down
54 changes: 15 additions & 39 deletions mirrord/agent/src/steal/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@ use hyper::{
http::{header::UPGRADE, request::Parts},
};
use mirrord_protocol::{
body_chunks::{BodyExt as _, Frames},
batched_body::{BatchedBody, Frames},
tcp::{
ChunkedHttpBody, ChunkedHttpError, ChunkedRequest, DaemonTcp, HttpRequest,
HttpResponseFallback, InternalHttpBody, InternalHttpBodyFrame, InternalHttpRequest,
StealType, TcpClose, TcpData, HTTP_CHUNKED_REQUEST_VERSION, HTTP_FILTERED_UPGRADE_VERSION,
HTTP_FRAMED_VERSION,
InternalHttpBody, InternalHttpBodyFrame, InternalHttpRequest, StealType, TcpClose, TcpData,
HTTP_CHUNKED_REQUEST_VERSION, HTTP_FILTERED_UPGRADE_VERSION, HTTP_FRAMED_VERSION,
},
ConnectionId, Port,
RemoteError::{BadHttpFilterExRegex, BadHttpFilterRegex},
Expand All @@ -31,6 +30,7 @@ use tokio::{
use tokio_util::sync::CancellationToken;
use tracing::warn;

use super::http::HttpResponseFallback;
use crate::{
error::{AgentError, Result},
steal::{
Expand Down Expand Up @@ -173,7 +173,7 @@ impl Client {
},
mut body,
) = request.request.into_parts();
match body.next_frames(true).await {
match body.ready_frames() {
Err(..) => return,
// We don't check is_last here since loop will finish when body.next_frames()
// returns None
Expand Down Expand Up @@ -205,7 +205,7 @@ impl Client {
}

loop {
match body.next_frames(false).await {
match body.next_frames().await {
Ok(Frames { frames, is_last }) => {
let frames = frames
.into_iter()
Expand Down Expand Up @@ -599,40 +599,16 @@ impl TcpConnectionStealer {
async fn send_http_response(&mut self, client_id: ClientId, response: HttpResponseFallback) {
let connection_id = response.connection_id();
let request_id = response.request_id();

match response.into_hyper::<hyper::Error>() {
Ok(response) => {
self.connections
.send(
connection_id,
ConnectionMessageIn::Response {
client_id,
request_id,
response,
},
)
.await;
}
Err(error) => {
tracing::warn!(
?error,
connection_id,
request_id,
self.connections
.send(
connection_id,
ConnectionMessageIn::Response {
client_id,
"Failed to transform client message into a hyper response",
);

self.connections
.send(
connection_id,
ConnectionMessageIn::ResponseFailed {
client_id,
request_id,
},
)
.await;
}
}
request_id,
response: response.into_hyper::<hyper::Error>(),
},
)
.await;
}

/// Handles [`Command`]s that were received by [`TcpConnectionStealer::command_rx`].
Expand Down
7 changes: 4 additions & 3 deletions mirrord/agent/src/steal/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
use crate::http::HttpVersion;

mod filter;
mod response_fallback;
mod reversible_stream;

pub use filter::HttpFilter;

pub(crate) use self::reversible_stream::ReversibleStream;
pub(crate) use filter::HttpFilter;
pub(crate) use response_fallback::{HttpResponseFallback, ReceiverStreamBody};
pub(crate) use reversible_stream::ReversibleStream;

/// Handy alias due to [`ReversibleStream`] being generic, avoiding value mismatches.
pub(crate) type DefaultReversibleStream = ReversibleStream<{ HttpVersion::MINIMAL_HEADER_SIZE }>;
58 changes: 58 additions & 0 deletions mirrord/agent/src/steal/http/response_fallback.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use std::convert::Infallible;

use bytes::Bytes;
use http_body_util::{combinators::BoxBody, BodyExt, Full, StreamBody};
use hyper::{body::Frame, Response};
use mirrord_protocol::{
tcp::{HttpResponse, InternalHttpBody},
ConnectionId, RequestId,
};
use tokio_stream::wrappers::ReceiverStream;

pub type ReceiverStreamBody = StreamBody<ReceiverStream<Result<Frame<Bytes>, Infallible>>>;

#[derive(Debug)]
pub enum HttpResponseFallback {
Framed(HttpResponse<InternalHttpBody>),
Fallback(HttpResponse<Vec<u8>>),
Streamed(HttpResponse<ReceiverStreamBody>),
}

impl HttpResponseFallback {
pub fn connection_id(&self) -> ConnectionId {
match self {
HttpResponseFallback::Framed(req) => req.connection_id,
HttpResponseFallback::Fallback(req) => req.connection_id,
HttpResponseFallback::Streamed(req) => req.connection_id,
}
}

pub fn request_id(&self) -> RequestId {
match self {
HttpResponseFallback::Framed(req) => req.request_id,
HttpResponseFallback::Fallback(req) => req.request_id,
HttpResponseFallback::Streamed(req) => req.request_id,
}
}

pub fn into_hyper<E>(self) -> Response<BoxBody<Bytes, E>> {
match self {
HttpResponseFallback::Framed(req) => req
.internal_response
.map_body(|body| body.map_err(|_| unreachable!()).boxed())
.into(),
HttpResponseFallback::Fallback(req) => req
.internal_response
.map_body(|body| {
Full::new(Bytes::from_owner(body))
.map_err(|_| unreachable!())
.boxed()
})
.into(),
HttpResponseFallback::Streamed(req) => req
.internal_response
.map_body(|body| body.map_err(|_| unreachable!()).boxed())
.into(),
}
}
}
Loading

0 comments on commit 2ee5a2c

Please sign in to comment.