From 49a8f05669370b00747449f4ed759e637e32a5d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Smolarek?= <34063647+Razz4780@users.noreply.github.com> Date: Wed, 28 Feb 2024 12:51:04 +0100 Subject: [PATCH] Larger outgoing buffers (#2279) * Increased read buffers to 64k * Changelog entry --- .../+increased-outgoing-buffers.changed.md | 1 + mirrord/agent/src/outgoing.rs | 8 ++++- .../src/proxies/outgoing/net_protocol_ext.rs | 32 +++++++++++-------- 3 files changed, 26 insertions(+), 15 deletions(-) create mode 100644 changelog.d/+increased-outgoing-buffers.changed.md diff --git a/changelog.d/+increased-outgoing-buffers.changed.md b/changelog.d/+increased-outgoing-buffers.changed.md new file mode 100644 index 00000000000..2473ce89da5 --- /dev/null +++ b/changelog.d/+increased-outgoing-buffers.changed.md @@ -0,0 +1 @@ +Increased size of buffers used by outgoing feature to read streams (from 4k to 64k in the agent, from 1k to 64k in the internal proxy). \ No newline at end of file diff --git a/mirrord/agent/src/outgoing.rs b/mirrord/agent/src/outgoing.rs index 19c7f51b588..731fa0d3265 100644 --- a/mirrord/agent/src/outgoing.rs +++ b/mirrord/agent/src/outgoing.rs @@ -44,6 +44,9 @@ pub(crate) struct TcpOutgoingApi { daemon_rx: Receiver, } +/// Buffer size for reading from the outgoing connections. +const READ_BUFFER_SIZE: usize = 64 * 1024; + #[tracing::instrument(level = "trace", skip(allocator, writers, readers, daemon_tx))] async fn layer_recv( layer_message: LayerTcpOutgoing, @@ -81,7 +84,10 @@ async fn layer_recv( // and writing from multiple hosts without blocking. let (read_half, write_half) = split(remote_stream); writers.insert(connection_id, write_half); - readers.insert(connection_id, ReaderStream::new(read_half)); + readers.insert( + connection_id, + ReaderStream::with_capacity(read_half, READ_BUFFER_SIZE), + ); Ok(DaemonConnect { connection_id, diff --git a/mirrord/intproxy/src/proxies/outgoing/net_protocol_ext.rs b/mirrord/intproxy/src/proxies/outgoing/net_protocol_ext.rs index 8c57de8bdae..f8b595b7199 100644 --- a/mirrord/intproxy/src/proxies/outgoing/net_protocol_ext.rs +++ b/mirrord/intproxy/src/proxies/outgoing/net_protocol_ext.rs @@ -7,6 +7,7 @@ use std::{ path::PathBuf, }; +use bytes::BytesMut; use mirrord_intproxy_protocol::NetProtocol; use mirrord_protocol::{ outgoing::{ @@ -154,22 +155,22 @@ impl PreparedSocket { /// Accepts one connection on this socket and returns a new socket for sending and receiving /// data. pub async fn accept(self) -> io::Result { - let (inner, is_really_connected, buf_size) = match self { + let (inner, is_really_connected) = match self { Self::TcpListener(listener) => { let (stream, _) = listener.accept().await?; - (InnerConnectedSocket::TcpStream(stream), true, 1024) + (InnerConnectedSocket::TcpStream(stream), true) } - Self::UdpSocket(socket) => (InnerConnectedSocket::UdpSocket(socket), false, 1500), + Self::UdpSocket(socket) => (InnerConnectedSocket::UdpSocket(socket), false), Self::UnixListener(listener) => { let (stream, _) = listener.accept().await?; - (InnerConnectedSocket::UnixStream(stream), true, 1024) + (InnerConnectedSocket::UnixStream(stream), true) } }; Ok(ConnectedSocket { inner, is_really_connected, - buffer: vec![0; buf_size], + buffer: BytesMut::with_capacity(64 * 1024), }) } } @@ -185,7 +186,7 @@ pub struct ConnectedSocket { inner: InnerConnectedSocket, /// Meaningful only when `inner` is [`InnerConnectedSocket::UdpSocket`]. is_really_connected: bool, - buffer: Vec, + buffer: BytesMut, } impl ConnectedSocket { @@ -223,19 +224,22 @@ impl ConnectedSocket { self.is_really_connected = true; } - let received = socket.recv(&mut self.buffer).await?; - - let bytes = self.buffer.get(..received).unwrap().to_vec(); - + socket.recv_buf(&mut self.buffer).await?; + let bytes = self.buffer.to_vec(); + self.buffer.clear(); Ok(bytes) } InnerConnectedSocket::TcpStream(stream) => { - let received = stream.read(&mut self.buffer).await?; - Ok(self.buffer.get(..received).unwrap().to_vec()) + stream.read_buf(&mut self.buffer).await?; + let bytes = self.buffer.to_vec(); + self.buffer.clear(); + Ok(bytes) } InnerConnectedSocket::UnixStream(stream) => { - let received = stream.read(&mut self.buffer).await?; - Ok(self.buffer.get(..received).unwrap().to_vec()) + stream.read_buf(&mut self.buffer).await?; + let bytes = self.buffer.to_vec(); + self.buffer.clear(); + Ok(bytes) } } }