Skip to content

Commit

Permalink
Larger outgoing buffers (#2279)
Browse files Browse the repository at this point in the history
* Increased read buffers to 64k

* Changelog entry
  • Loading branch information
Razz4780 authored Feb 28, 2024
1 parent cff119d commit 49a8f05
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 15 deletions.
1 change: 1 addition & 0 deletions changelog.d/+increased-outgoing-buffers.changed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Increased size of buffers used by outgoing feature to read streams (from 4k to 64k in the agent, from 1k to 64k in the internal proxy).
8 changes: 7 additions & 1 deletion mirrord/agent/src/outgoing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ pub(crate) struct TcpOutgoingApi {
daemon_rx: Receiver<Daemon>,
}

/// Buffer size for reading from the outgoing connections.
const READ_BUFFER_SIZE: usize = 64 * 1024;

#[tracing::instrument(level = "trace", skip(allocator, writers, readers, daemon_tx))]
async fn layer_recv(
layer_message: LayerTcpOutgoing,
Expand Down Expand Up @@ -81,7 +84,10 @@ async fn layer_recv(
// and writing from multiple hosts without blocking.
let (read_half, write_half) = split(remote_stream);
writers.insert(connection_id, write_half);
readers.insert(connection_id, ReaderStream::new(read_half));
readers.insert(
connection_id,
ReaderStream::with_capacity(read_half, READ_BUFFER_SIZE),
);

Ok(DaemonConnect {
connection_id,
Expand Down
32 changes: 18 additions & 14 deletions mirrord/intproxy/src/proxies/outgoing/net_protocol_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
path::PathBuf,
};

use bytes::BytesMut;
use mirrord_intproxy_protocol::NetProtocol;
use mirrord_protocol::{
outgoing::{
Expand Down Expand Up @@ -154,22 +155,22 @@ impl PreparedSocket {
/// Accepts one connection on this socket and returns a new socket for sending and receiving
/// data.
pub async fn accept(self) -> io::Result<ConnectedSocket> {
let (inner, is_really_connected, buf_size) = match self {
let (inner, is_really_connected) = match self {
Self::TcpListener(listener) => {
let (stream, _) = listener.accept().await?;
(InnerConnectedSocket::TcpStream(stream), true, 1024)
(InnerConnectedSocket::TcpStream(stream), true)
}
Self::UdpSocket(socket) => (InnerConnectedSocket::UdpSocket(socket), false, 1500),
Self::UdpSocket(socket) => (InnerConnectedSocket::UdpSocket(socket), false),
Self::UnixListener(listener) => {
let (stream, _) = listener.accept().await?;
(InnerConnectedSocket::UnixStream(stream), true, 1024)
(InnerConnectedSocket::UnixStream(stream), true)
}
};

Ok(ConnectedSocket {
inner,
is_really_connected,
buffer: vec![0; buf_size],
buffer: BytesMut::with_capacity(64 * 1024),
})
}
}
Expand All @@ -185,7 +186,7 @@ pub struct ConnectedSocket {
inner: InnerConnectedSocket,
/// Meaningful only when `inner` is [`InnerConnectedSocket::UdpSocket`].
is_really_connected: bool,
buffer: Vec<u8>,
buffer: BytesMut,
}

impl ConnectedSocket {
Expand Down Expand Up @@ -223,19 +224,22 @@ impl ConnectedSocket {
self.is_really_connected = true;
}

let received = socket.recv(&mut self.buffer).await?;

let bytes = self.buffer.get(..received).unwrap().to_vec();

socket.recv_buf(&mut self.buffer).await?;
let bytes = self.buffer.to_vec();
self.buffer.clear();
Ok(bytes)
}
InnerConnectedSocket::TcpStream(stream) => {
let received = stream.read(&mut self.buffer).await?;
Ok(self.buffer.get(..received).unwrap().to_vec())
stream.read_buf(&mut self.buffer).await?;
let bytes = self.buffer.to_vec();
self.buffer.clear();
Ok(bytes)
}
InnerConnectedSocket::UnixStream(stream) => {
let received = stream.read(&mut self.buffer).await?;
Ok(self.buffer.get(..received).unwrap().to_vec())
stream.read_buf(&mut self.buffer).await?;
let bytes = self.buffer.to_vec();
self.buffer.clear();
Ok(bytes)
}
}
}
Expand Down

0 comments on commit 49a8f05

Please sign in to comment.