Skip to content

Commit

Permalink
Fixed streaming HTTP responses (#2563)
Browse files Browse the repository at this point in the history
* Fixed picking correct HttpResponseFallback variant in intproxy

* Changelog entry
  • Loading branch information
Razz4780 authored Jul 2, 2024
1 parent 6c8249a commit 2c2dfdc
Show file tree
Hide file tree
Showing 9 changed files with 140 additions and 19 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions changelog.d/2562.internal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Correct version of HTTP response is sent based on agent protocol version.
4 changes: 2 additions & 2 deletions mirrord/agent/src/steal/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use mirrord_protocol::{
tcp::{
ChunkedHttpBody, ChunkedHttpError, ChunkedRequest, DaemonTcp, HttpRequest,
HttpResponseFallback, InternalHttpBody, InternalHttpBodyFrame, InternalHttpRequest,
StealType, TcpClose, TcpData, HTTP_CHUNKED_VERSION, HTTP_FILTERED_UPGRADE_VERSION,
StealType, TcpClose, TcpData, HTTP_CHUNKED_REQUEST_VERSION, HTTP_FILTERED_UPGRADE_VERSION,
HTTP_FRAMED_VERSION,
},
ConnectionId, Port,
Expand Down Expand Up @@ -150,7 +150,7 @@ impl Client {
}

let framed = HTTP_FRAMED_VERSION.matches(&self.protocol_version);
let chunked = HTTP_CHUNKED_VERSION.matches(&self.protocol_version);
let chunked = HTTP_CHUNKED_REQUEST_VERSION.matches(&self.protocol_version);
let tx = self.tx.clone();

tokio::spawn(async move {
Expand Down
1 change: 1 addition & 0 deletions mirrord/intproxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ mirrord-protocol = { path = "../protocol" }
mirrord-intproxy-protocol = { path = "./protocol", features = ["codec-async"] }
mirrord-analytics = { path = "../analytics" }

semver.workspace = true
serde.workspace = true
thiserror.workspace = true
tokio.workspace = true
Expand Down
5 changes: 5 additions & 0 deletions mirrord/intproxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,11 @@ impl IntProxy {
if CLIENT_READY_FOR_LOGS.matches(&protocol_version) {
self.task_txs.agent.send(ClientMessage::ReadyForLogs).await;
}

self.task_txs
.incoming
.send(IncomingProxyMessage::AgentProtocolVersion(protocol_version))
.await;
}
DaemonMessage::LogMessage(log) => match log.level {
LogLevel::Error => tracing::error!("agent log: {}", log.message),
Expand Down
19 changes: 17 additions & 2 deletions mirrord/intproxy/src/proxies/incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ pub enum IncomingProxyMessage {
LayerClosed(LayerClosed),
AgentMirror(DaemonTcp),
AgentSteal(DaemonTcp),
/// Agent responded to [`ClientMessage::SwitchProtocolVersion`].
AgentProtocolVersion(semver::Version),
}

/// Handle for an [`Interceptor`].
Expand Down Expand Up @@ -167,6 +169,8 @@ pub struct IncomingProxy {
request_body_txs: HashMap<(ConnectionId, RequestId), Sender<InternalHttpBodyFrame>>,
/// For managing streamed [`LayerTcpSteal::HttpResponseChunked`] response streams.
response_body_rxs: StreamMap<(ConnectionId, RequestId), StreamNotifyClose<ReceiverStreamBody>>,
/// Version of [`mirrord_protocol`] negotiated with the agent.
agent_protocol_version: Option<semver::Version>,
}

impl IncomingProxy {
Expand Down Expand Up @@ -234,7 +238,11 @@ impl IncomingProxy {
let interceptor_socket = bind_similar(subscription.listening_on)?;

let interceptor = self.background_tasks.register(
Interceptor::new(interceptor_socket, subscription.listening_on),
Interceptor::new(
interceptor_socket,
subscription.listening_on,
self.agent_protocol_version.clone(),
),
id,
Self::CHANNEL_SIZE,
);
Expand Down Expand Up @@ -382,7 +390,11 @@ impl IncomingProxy {
);

let interceptor = self.background_tasks.register(
Interceptor::new(interceptor_socket, subscription.listening_on),
Interceptor::new(
interceptor_socket,
subscription.listening_on,
self.agent_protocol_version.clone(),
),
id,
Self::CHANNEL_SIZE,
);
Expand Down Expand Up @@ -497,6 +509,9 @@ impl BackgroundTask for IncomingProxy {
}
Some(IncomingProxyMessage::LayerClosed(msg)) => self.handle_layer_close(msg, message_bus).await,
Some(IncomingProxyMessage::LayerForked(msg)) => self.handle_layer_fork(msg),
Some(IncomingProxyMessage::AgentProtocolVersion(version)) => {
self.agent_protocol_version.replace(version);
}
},

Some(task_update) = self.background_tasks.next() => match task_update {
Expand Down
67 changes: 62 additions & 5 deletions mirrord/intproxy/src/proxies/incoming/interceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use hyper::{upgrade::OnUpgrade, StatusCode, Version};
use hyper_util::rt::TokioIo;
use mirrord_protocol::tcp::{
HttpRequestFallback, HttpResponse, HttpResponseFallback, InternalHttpBody, ReceiverStreamBody,
HTTP_CHUNKED_RESPONSE_VERSION,
};
use thiserror::Error;
use tokio::{
Expand Down Expand Up @@ -86,8 +87,12 @@ pub type InterceptorResult<T, E = InterceptorError> = core::result::Result<T, E>
/// When it receives [`MessageIn::Raw`], it starts acting as a simple proxy.
/// When it received [`MessageIn::Http`], it starts acting as an HTTP gateway.
pub struct Interceptor {
/// Socket that should be used to make the first connection (should already be bound).
socket: TcpSocket,
/// Address of user app's listener.
peer: SocketAddr,
/// Version of [`mirrord_protocol`] negotiated with the agent.
agent_protocol_version: Option<semver::Version>,
}

impl Interceptor {
Expand All @@ -97,8 +102,16 @@ impl Interceptor {
/// # Note
///
/// The socket can be replaced when retrying HTTP requests.
pub fn new(socket: TcpSocket, peer: SocketAddr) -> Self {
Self { socket, peer }
pub fn new(
socket: TcpSocket,
peer: SocketAddr,
agent_protocol_version: Option<semver::Version>,
) -> Self {
Self {
socket,
peer,
agent_protocol_version,
}
}
}

Expand Down Expand Up @@ -139,6 +152,7 @@ impl BackgroundTask for Interceptor {
let mut http_conn = HttpConnection {
sender,
peer: self.peer,
agent_protocol_version: self.agent_protocol_version.clone(),
};
let (response, on_upgrade) = http_conn.send(request).await?;
message_bus.send(MessageOut::Http(response)).await;
Expand Down Expand Up @@ -176,11 +190,27 @@ struct HttpConnection {
peer: SocketAddr,
/// Handle to the HTTP connection between the [`Interceptor`] the server.
sender: HttpSender,
/// Version of [`mirrord_protocol`] negotiated with the agent.
/// Determines which variant of [`LayerTcpSteal`](mirrord_protocol::tcp::LayerTcpSteal)
/// we use when sending HTTP responses.
agent_protocol_version: Option<semver::Version>,
}

impl HttpConnection {
/// Returns whether the agent supports
/// [`LayerTcpSteal::HttpResponseChunked`](mirrord_protocol::tcp::LayerTcpSteal::HttpResponseChunked).
pub fn agent_supports_streaming_response(&self) -> bool {
self.agent_protocol_version
.as_ref()
.map(|version| HTTP_CHUNKED_RESPONSE_VERSION.matches(version))
.unwrap_or(false)
}

/// Handles the result of sending an HTTP request.
/// Returns an [`HttpResponseFallback`] to be returned to the client or an [`InterceptorError`].
///
/// See [`HttpResponseFallback::response_from_request`] for notes on picking the correct
/// [`HttpResponseFallback`] variant.
async fn handle_response(
&self,
request: HttpRequestFallback,
Expand Down Expand Up @@ -209,6 +239,7 @@ impl HttpConnection {
request,
StatusCode::BAD_GATEWAY,
&body_message,
self.agent_protocol_version.as_ref(),
),
None,
))
Expand All @@ -224,6 +255,7 @@ impl HttpConnection {
request,
StatusCode::BAD_GATEWAY,
&body_message,
self.agent_protocol_version.as_ref(),
),
None,
))
Expand Down Expand Up @@ -257,7 +289,9 @@ impl HttpConnection {
.await
.map(HttpResponseFallback::Fallback)
}
HttpRequestFallback::Streamed(..) => {
HttpRequestFallback::Streamed(..)
if self.agent_supports_streaming_response() =>
{
HttpResponse::<ReceiverStreamBody>::from_hyper_response(
res,
self.peer.port(),
Expand All @@ -267,6 +301,16 @@ impl HttpConnection {
.await
.map(HttpResponseFallback::Streamed)
}
HttpRequestFallback::Streamed(..) => {
HttpResponse::<InternalHttpBody>::from_hyper_response(
res,
self.peer.port(),
request.connection_id(),
request.request_id(),
)
.await
.map(HttpResponseFallback::Framed)
}
};

Ok(result
Expand All @@ -283,6 +327,7 @@ impl HttpConnection {
request,
StatusCode::BAD_GATEWAY,
"mirrord",
self.agent_protocol_version.as_ref(),
),
None,
)
Expand Down Expand Up @@ -562,7 +607,15 @@ mod test {
let interceptor = {
let socket = TcpSocket::new_v4().unwrap();
socket.bind("127.0.0.1:0".parse().unwrap()).unwrap();
tasks.register(Interceptor::new(socket, local_destination), (), 8)
tasks.register(
Interceptor::new(
socket,
local_destination,
Some(mirrord_protocol::VERSION.clone()),
),
(),
8,
)
};

interceptor
Expand Down Expand Up @@ -638,7 +691,11 @@ mod test {
let mut tasks: BackgroundTasks<(), MessageOut, InterceptorError> = Default::default();
let socket = TcpSocket::new_v4().unwrap();
socket.bind("127.0.0.1:0".parse().unwrap()).unwrap();
let interceptor = Interceptor::new(socket, local_destination);
let interceptor = Interceptor::new(
socket,
local_destination,
Some(mirrord_protocol::VERSION.clone()),
);
let sender = tasks.register(interceptor, (), 8);

let (tx, rx) = tokio::sync::mpsc::channel(12);
Expand Down
2 changes: 1 addition & 1 deletion mirrord/protocol/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mirrord-protocol"
version = "1.8.0"
version = "1.8.1"
authors.workspace = true
description.workspace = true
documentation.workspace = true
Expand Down
57 changes: 49 additions & 8 deletions mirrord/protocol/src/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,18 +397,22 @@ impl HttpRequestFallback {
}
}

/// Minimal mirrord-protocol version that allows [`DaemonTcp::HttpRequestFramed`] instead of
/// [`DaemonTcp::HttpRequest`].
/// Minimal mirrord-protocol version that allows [`DaemonTcp::HttpRequestFramed`] and
/// [`LayerTcpSteal::HttpResponseFramed`].
pub static HTTP_FRAMED_VERSION: LazyLock<VersionReq> =
LazyLock::new(|| ">=1.3.0".parse().expect("Bad Identifier"));

/// Minimal mirrord-protocol version that allows [`DaemonTcp::HttpRequestChunked`] instead of
/// [`DaemonTcp::HttpRequest`].
pub static HTTP_CHUNKED_VERSION: LazyLock<VersionReq> =
/// Minimal mirrord-protocol version that allows [`DaemonTcp::HttpRequestChunked`].
pub static HTTP_CHUNKED_REQUEST_VERSION: LazyLock<VersionReq> =
LazyLock::new(|| ">=1.7.0".parse().expect("Bad Identifier"));

/// Minimal mirrord-protocol version that allows [`LayerTcpSteal::HttpResponseChunked`].
pub static HTTP_CHUNKED_RESPONSE_VERSION: LazyLock<VersionReq> =
LazyLock::new(|| ">=1.8.1".parse().expect("Bad Identifier"));

/// Minimal mirrord-protocol version that allows [`DaemonTcp::Data`] to be sent in the same
/// connection as [`DaemonTcp::HttpRequestFramed`] and [`DaemonTcp::HttpRequest`].
/// connection as
/// [`DaemonTcp::HttpRequestChunked`]/[`DaemonTcp::HttpRequestFramed`]/[`DaemonTcp::HttpRequest`].
pub static HTTP_FILTERED_UPGRADE_VERSION: LazyLock<VersionReq> =
LazyLock::new(|| ">=1.5.0".parse().expect("Bad Identifier"));

Expand Down Expand Up @@ -582,20 +586,57 @@ impl HttpResponseFallback {
}
}

/// Produces an [`HttpResponseFallback`] to the given [`HttpRequestFallback`].
///
/// # Note on picking response variant
///
/// Variant of returned [`HttpResponseFallback`] is picked based on the variant of given
/// [`HttpRequestFallback`] and agent protocol version. We need to consider both due
/// to:
/// 1. Old agent versions always responding with client's `mirrord_protocol` version to
/// [`ClientMessage::SwitchProtocolVersion`](super::ClientMessage::SwitchProtocolVersion),
/// 2. [`LayerTcpSteal::HttpResponseChunked`] being introduced after
/// [`DaemonTcp::HttpRequestChunked`].
pub fn response_from_request(
request: HttpRequestFallback,
status: StatusCode,
message: &str,
agent_protocol_version: Option<&semver::Version>,
) -> Self {
let agent_supports_streaming_response = agent_protocol_version
.map(|version| HTTP_CHUNKED_RESPONSE_VERSION.matches(version))
.unwrap_or(false);

match request {
// We received `DaemonTcp::HttpRequestFramed` from the agent,
// so we know it supports `LayerTcpSteal::HttpResponseFramed` (both were introduced in
// the same `mirrord_protocol` version).
HttpRequestFallback::Framed(request) => HttpResponseFallback::Framed(
HttpResponse::<InternalHttpBody>::response_from_request(request, status, message),
),

// We received `DaemonTcp::HttpRequest` from the agent, so we assume it only supports
// `LayerTcpSteal::HttpResponse`.
HttpRequestFallback::Fallback(request) => HttpResponseFallback::Fallback(
HttpResponse::<Vec<u8>>::response_from_request(request, status, message),
),
HttpRequestFallback::Streamed(request) => HttpResponseFallback::Streamed(
HttpResponse::<ReceiverStreamBody>::response_from_request(request, status, message),

// We received `DaemonTcp::HttpRequestChunked` and the agent supports
// `LayerTcpSteal::HttpResponseChunked`.
HttpRequestFallback::Streamed(request) if agent_supports_streaming_response => {
HttpResponseFallback::Streamed(
HttpResponse::<ReceiverStreamBody>::response_from_request(
request, status, message,
),
)
}

// We received `DaemonTcp::HttpRequestChunked` from the agent,
// but the agent does not support `LayerTcpSteal::HttpResponseChunked`.
// However, it must support the older `LayerTcpSteal::HttpResponseFramed`
// variant (was introduced before `DaemonTcp::HttpRequestChunked`).
HttpRequestFallback::Streamed(request) => HttpResponseFallback::Framed(
HttpResponse::<InternalHttpBody>::response_from_request(request, status, message),
),
}
}
Expand Down

0 comments on commit 2c2dfdc

Please sign in to comment.