Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix hanging requests with filtered steal #3016

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
a0c317a
Integration test
Razz4780 Jan 14, 2025
f7359bc
Moved MetadataStore to a separate module
Razz4780 Jan 14, 2025
48bac27
bind_similar -> BoundTcpSocket
Razz4780 Jan 14, 2025
5798219
BodyExt -> BatchedBody, reworked trait
Razz4780 Jan 14, 2025
6e2d3bc
local HTTP handling rework
Razz4780 Jan 14, 2025
6a47585
Remove obsolete stuff from mirrord-protocol
Razz4780 Jan 14, 2025
4b79bf7
Moved HttpResponseFallback to the agent
Razz4780 Jan 14, 2025
3486e0a
Moved frame senders to InterceptorHandle
Razz4780 Jan 14, 2025
e5b5d93
HttpResponseReaders in IncomingProxy
Razz4780 Jan 14, 2025
520f5e9
Clippy
Razz4780 Jan 14, 2025
85940d7
Better tracing
Razz4780 Jan 14, 2025
f59a62c
HttpResponseReader logic split into methods
Razz4780 Jan 15, 2025
600d958
unless_bus_closed
Razz4780 Jan 15, 2025
8b15a86
I hate this
Razz4780 Jan 16, 2025
ab4fdb5
HttpGateway tests
Razz4780 Jan 16, 2025
d729c3b
ClientStore test
Razz4780 Jan 16, 2025
1a08649
Changed implementation of ClientStore shared state
Razz4780 Jan 16, 2025
c25b15e
Some docs
Razz4780 Jan 16, 2025
295099f
Docs
Razz4780 Jan 16, 2025
7508902
Removed obsolete integration test - replaced before with a unit test
Razz4780 Jan 16, 2025
203af97
More ClientStore tracing
Razz4780 Jan 16, 2025
39dee15
Less spammy debug for InProxyTaskMessage
Razz4780 Jan 16, 2025
609ecd1
Clippy and docs
Razz4780 Jan 16, 2025
20c6fc2
More tracing
Razz4780 Jan 16, 2025
053599c
Clippy
Razz4780 Jan 17, 2025
5c343fb
Fixed TcpProxyTask
Razz4780 Jan 17, 2025
0a346a4
More IncomingProxy docs
Razz4780 Jan 17, 2025
7458a38
macos tests fixed
Razz4780 Jan 17, 2025
7b8a14f
Fixed reverseportforwarder and its tests
Razz4780 Jan 17, 2025
78f47e8
Upgrade fixed
Razz4780 Jan 17, 2025
cfa95ff
Extended changelog
Razz4780 Jan 18, 2025
c612152
Frames doc
Razz4780 Jan 18, 2025
91a18c4
Helper function for BatchedBody
Razz4780 Jan 18, 2025
16a4da1
auto_responder -> unwrap instead of is_err + break
Razz4780 Jan 18, 2025
ec0fd2c
ClientStore unwrap -> expect
Razz4780 Jan 18, 2025
b09ea4a
Comments for ClientStore cleanup_task
Razz4780 Jan 18, 2025
7e91864
Closed doc
Razz4780 Jan 19, 2025
3ffd3d2
TcpStealApi::response_body_tx doc
Razz4780 Jan 19, 2025
d6fe566
Removed expect from client_store::cleanup_task
Razz4780 Jan 19, 2025
10981cb
Merge branch 'main' into michals/mbe-649-filtered-steal-hangs-on-play…
Razz4780 Jan 19, 2025
e437124
Doc lint
Razz4780 Jan 20, 2025
4fd0f6a
Update mirrord/intproxy/src/background_tasks.rs
Razz4780 Jan 20, 2025
ea247c9
Update mirrord/intproxy/src/proxies/incoming.rs
Razz4780 Jan 20, 2025
f6fad54
error -> unreachable
Razz4780 Jan 20, 2025
5ac91bc
pub(crate) for Closed
Razz4780 Jan 20, 2025
a02a339
Update mirrord/intproxy/src/proxies/incoming.rs
Razz4780 Jan 20, 2025
4a01232
not war
Razz4780 Jan 20, 2025
93bbeee
More doccc
Razz4780 Jan 20, 2025
d3d2898
self_address -> self
Razz4780 Jan 20, 2025
6342a43
rephrased error messages
Razz4780 Jan 20, 2025
b7c1c31
instrument on LocalHttpClient::new
Razz4780 Jan 20, 2025
dbe4418
More docs on clone for StreamingBody
Razz4780 Jan 20, 2025
123f310
docc
Razz4780 Jan 20, 2025
ff9d317
docsss
Razz4780 Jan 20, 2025
f2bc71c
Doc fixed
Razz4780 Jan 20, 2025
93b7baa
Doc fixed
Razz4780 Jan 20, 2025
9b22880
more instrument
Razz4780 Jan 20, 2025
271df63
in whole -> without a filter
Razz4780 Jan 21, 2025
f961054
moar doccc
Razz4780 Jan 21, 2025
b85878c
TPC -> TCP
Razz4780 Jan 21, 2025
216e2c7
added ignore to doctest
Razz4780 Jan 21, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions changelog.d/3013.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fixed an issue where HTTP requests stolen with a filter would hang with a single-threaded local HTTP server.
meowjesty marked this conversation as resolved.
Show resolved Hide resolved
Improved handling of incoming connections on the local machine (e.g introduces reuse of local HTTP connections).
4 changes: 3 additions & 1 deletion mirrord/agent/src/steal.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use mirrord_protocol::{
tcp::{DaemonTcp, HttpResponseFallback, StealType, TcpData},
tcp::{DaemonTcp, StealType, TcpData},
ConnectionId, Port,
};
use tokio::sync::mpsc::Sender;
Expand All @@ -17,6 +17,8 @@ mod subscriptions;
pub(crate) use api::TcpStealerApi;
pub(crate) use connection::TcpConnectionStealer;

use self::http::HttpResponseFallback;

/// Commands from the agent that are passed down to the stealer worker, through [`TcpStealerApi`].
///
/// These are the operations that the agent receives from the layer to make the _steal_ feature
Expand Down
23 changes: 15 additions & 8 deletions mirrord/agent/src/steal/api.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
use std::collections::HashMap;
use std::{collections::HashMap, convert::Infallible};

use bytes::Bytes;
use hyper::body::Frame;
use mirrord_protocol::{
tcp::{
ChunkedResponse, DaemonTcp, HttpResponse, HttpResponseFallback, InternalHttpResponse,
LayerTcpSteal, ReceiverStreamBody, TcpData,
},
tcp::{ChunkedResponse, DaemonTcp, HttpResponse, InternalHttpResponse, LayerTcpSteal, TcpData},
RequestId,
};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio_stream::wrappers::ReceiverStream;

use super::*;
use super::{http::ReceiverStreamBody, *};
use crate::{
error::{AgentError, Result},
util::ClientId,
watched_task::TaskStatus,
};

type ResponseBodyTx = Sender<Result<Frame<Bytes>, Infallible>>;

/// Bridges the communication between the agent and the [`TcpConnectionStealer`] task.
/// There is an API instance for each connected layer ("client"). All API instances send commands
/// On the same stealer command channel, where the layer-independent stealer listens to them.
Expand All @@ -40,7 +39,15 @@ pub(crate) struct TcpStealerApi {
/// View on the stealer task's status.
task_status: TaskStatus,

response_body_txs: HashMap<(ConnectionId, RequestId), Sender<hyper::Result<Frame<Bytes>>>>,
/// [`Sender`]s that allow us to provide body [`Frame`]s of responses to filtered HTTP
/// requests.
///
/// With [`LayerTcpSteal::HttpResponseChunked`], response bodies come from the client
/// in a series of [`ChunkedResponse::Body`] messages.
///
/// Thus, we use [`ReceiverStreamBody`] for [`Response`](hyper::Response)'s body type and
/// pipe the [`Frame`]s through an [`mpsc::channel`].
response_body_txs: HashMap<(ConnectionId, RequestId), ResponseBodyTx>,
}
meowjesty marked this conversation as resolved.
Show resolved Hide resolved

impl TcpStealerApi {
Expand Down Expand Up @@ -196,7 +203,7 @@ impl TcpStealerApi {
let key = (response.connection_id, response.request_id);
self.response_body_txs.insert(key, tx.clone());

self.http_response(HttpResponseFallback::Streamed(http_response, None))
self.http_response(HttpResponseFallback::Streamed(http_response))
.await?;

for frame in response.internal_response.body {
Expand Down
54 changes: 15 additions & 39 deletions mirrord/agent/src/steal/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@ use hyper::{
http::{header::UPGRADE, request::Parts},
};
use mirrord_protocol::{
body_chunks::{BodyExt as _, Frames},
batched_body::{BatchedBody, Frames},
tcp::{
ChunkedHttpBody, ChunkedHttpError, ChunkedRequest, DaemonTcp, HttpRequest,
HttpResponseFallback, InternalHttpBody, InternalHttpBodyFrame, InternalHttpRequest,
StealType, TcpClose, TcpData, HTTP_CHUNKED_REQUEST_VERSION, HTTP_FILTERED_UPGRADE_VERSION,
HTTP_FRAMED_VERSION,
InternalHttpBody, InternalHttpBodyFrame, InternalHttpRequest, StealType, TcpClose, TcpData,
HTTP_CHUNKED_REQUEST_VERSION, HTTP_FILTERED_UPGRADE_VERSION, HTTP_FRAMED_VERSION,
},
ConnectionId, Port,
RemoteError::{BadHttpFilterExRegex, BadHttpFilterRegex},
Expand All @@ -31,6 +30,7 @@ use tokio::{
use tokio_util::sync::CancellationToken;
use tracing::warn;

use super::http::HttpResponseFallback;
use crate::{
error::{AgentError, Result},
steal::{
Expand Down Expand Up @@ -173,7 +173,7 @@ impl Client {
},
mut body,
) = request.request.into_parts();
match body.next_frames(true).await {
match body.ready_frames() {
Err(..) => return,
// We don't check is_last here since loop will finish when body.next_frames()
// returns None
Expand Down Expand Up @@ -205,7 +205,7 @@ impl Client {
}

loop {
match body.next_frames(false).await {
match body.next_frames().await {
Ok(Frames { frames, is_last }) => {
let frames = frames
.into_iter()
Expand Down Expand Up @@ -599,40 +599,16 @@ impl TcpConnectionStealer {
async fn send_http_response(&mut self, client_id: ClientId, response: HttpResponseFallback) {
let connection_id = response.connection_id();
let request_id = response.request_id();

match response.into_hyper::<hyper::Error>() {
Ok(response) => {
self.connections
.send(
connection_id,
ConnectionMessageIn::Response {
client_id,
request_id,
response,
},
)
.await;
}
Err(error) => {
tracing::warn!(
?error,
connection_id,
request_id,
self.connections
.send(
connection_id,
ConnectionMessageIn::Response {
client_id,
"Failed to transform client message into a hyper response",
);

self.connections
.send(
connection_id,
ConnectionMessageIn::ResponseFailed {
client_id,
request_id,
},
)
.await;
}
}
request_id,
response: response.into_hyper::<hyper::Error>(),
},
)
.await;
}

/// Handles [`Command`]s that were received by [`TcpConnectionStealer::command_rx`].
Expand Down
7 changes: 4 additions & 3 deletions mirrord/agent/src/steal/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
use crate::http::HttpVersion;

mod filter;
mod response_fallback;
mod reversible_stream;

pub use filter::HttpFilter;

pub(crate) use self::reversible_stream::ReversibleStream;
pub(crate) use filter::HttpFilter;
pub(crate) use response_fallback::{HttpResponseFallback, ReceiverStreamBody};
pub(crate) use reversible_stream::ReversibleStream;

/// Handy alias due to [`ReversibleStream`] being generic, avoiding value mismatches.
pub(crate) type DefaultReversibleStream = ReversibleStream<{ HttpVersion::MINIMAL_HEADER_SIZE }>;
meowjesty marked this conversation as resolved.
Show resolved Hide resolved
58 changes: 58 additions & 0 deletions mirrord/agent/src/steal/http/response_fallback.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use std::convert::Infallible;

use bytes::Bytes;
use http_body_util::{combinators::BoxBody, BodyExt, Full, StreamBody};
use hyper::{body::Frame, Response};
use mirrord_protocol::{
tcp::{HttpResponse, InternalHttpBody},
ConnectionId, RequestId,
};
use tokio_stream::wrappers::ReceiverStream;

pub type ReceiverStreamBody = StreamBody<ReceiverStream<Result<Frame<Bytes>, Infallible>>>;

#[derive(Debug)]
pub enum HttpResponseFallback {
Framed(HttpResponse<InternalHttpBody>),
Fallback(HttpResponse<Vec<u8>>),
Streamed(HttpResponse<ReceiverStreamBody>),
}

impl HttpResponseFallback {
pub fn connection_id(&self) -> ConnectionId {
match self {
HttpResponseFallback::Framed(req) => req.connection_id,
HttpResponseFallback::Fallback(req) => req.connection_id,
HttpResponseFallback::Streamed(req) => req.connection_id,
}
}

pub fn request_id(&self) -> RequestId {
match self {
HttpResponseFallback::Framed(req) => req.request_id,
HttpResponseFallback::Fallback(req) => req.request_id,
HttpResponseFallback::Streamed(req) => req.request_id,
}
}

pub fn into_hyper<E>(self) -> Response<BoxBody<Bytes, E>> {
match self {
HttpResponseFallback::Framed(req) => req
.internal_response
.map_body(|body| body.map_err(|_| unreachable!()).boxed())
meowjesty marked this conversation as resolved.
Show resolved Hide resolved
.into(),
HttpResponseFallback::Fallback(req) => req
.internal_response
.map_body(|body| {
Full::new(Bytes::from_owner(body))
.map_err(|_| unreachable!())
.boxed()
})
.into(),
HttpResponseFallback::Streamed(req) => req
.internal_response
.map_body(|body| body.map_err(|_| unreachable!()).boxed())
.into(),
}
}
}
Loading
Loading