Skip to content

Commit

Permalink
Improve mirroring for high stress (#2561)
Browse files Browse the repository at this point in the history
* Some opts

* No blocking in the sniffer loop

* Clippy

* Changelog entry

* Docs

* Sniffer messages docs

* TcpSessionIdentifier visibility

* Sniffer submodules visibility

* TcpConnectionSniffer docs

* Update mirrord/agent/src/sniffer/messages.rs

Co-authored-by: meowjesty <[email protected]>

* Update mirrord/agent/src/sniffer.rs

Co-authored-by: meowjesty <[email protected]>

* Update mirrord/agent/src/sniffer.rs

Co-authored-by: meowjesty <[email protected]>

* CR style suggestions

* CR style suggestion

* What the hell is not working?

* Docs fixed

* Hell yeah

---------

Co-authored-by: meowjesty <[email protected]>
  • Loading branch information
Razz4780 and meowjesty authored Jul 2, 2024
1 parent 63b3eed commit dbe58ed
Show file tree
Hide file tree
Showing 11 changed files with 1,207 additions and 478 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ actix-codec = "0.5"
bincode = { version = "2.0.0-rc.2", features = ["serde"] }
bytes = "1"
tokio = { version = "1" }
tokio-stream = "0.1"
tokio-stream = { version = "0.1", features = ["sync"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
anyhow = "1"
Expand Down
1 change: 1 addition & 0 deletions changelog.d/2529.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improved agent performance when mirroring is under high load.
27 changes: 19 additions & 8 deletions mirrord/agent/src/entrypoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use client_connection::AgentTlsConnector;
use dns::{DnsCommand, DnsWorker};
use futures::TryFutureExt;
use mirrord_protocol::{ClientMessage, DaemonMessage, GetEnvVarsRequest, LogMessage};
use sniffer::tcp_capture::RawSocketTcpCapture;
use tokio::{
net::{TcpListener, TcpStream},
process::Command,
Expand All @@ -35,7 +36,7 @@ use crate::{
file::FileManager,
outgoing::{TcpOutgoingApi, UdpOutgoingApi},
runtime::get_container,
sniffer::{SnifferCommand, TcpConnectionSniffer, TcpSnifferApi},
sniffer::{api::TcpSnifferApi, messages::SnifferCommand, TcpConnectionSniffer},
steal::{
ip_tables::{
new_iptables, IPTablesWrapper, SafeIpTables, IPTABLE_MESH, IPTABLE_MESH_ENV,
Expand All @@ -48,8 +49,7 @@ use crate::{
*,
};

/// Size of [`mpsc`] channels connecting [`TcpStealerApi`] and [`TcpSnifferApi`] with their
/// background tasks.
/// Size of [`mpsc`] channels connecting [`TcpStealerApi`] with the background task.
const CHANNEL_SIZE: usize = 1024;

/// Keeps track of next client id.
Expand Down Expand Up @@ -201,6 +201,8 @@ struct ClientConnectionHandler {
udp_outgoing_api: UdpOutgoingApi,
dns_api: DnsApi,
state: State,
/// Whether the client has sent us [`ClientMessage::ReadyForLogs`].
ready_for_logs: bool,
}

impl ClientConnectionHandler {
Expand Down Expand Up @@ -233,6 +235,7 @@ impl ClientConnectionHandler {
udp_outgoing_api,
dns_api,
state,
ready_for_logs: false,
};

Ok(client_handler)
Expand All @@ -244,7 +247,7 @@ impl ClientConnectionHandler {
connection: &mut ClientConnection,
) -> Option<TcpSnifferApi> {
if let BackgroundTask::Running(sniffer_status, sniffer_sender) = task {
match TcpSnifferApi::new(id, sniffer_sender, sniffer_status, CHANNEL_SIZE).await {
match TcpSnifferApi::new(id, sniffer_sender, sniffer_status).await {
Ok(api) => Some(api),
Err(e) => {
let message = format!(
Expand Down Expand Up @@ -338,7 +341,13 @@ impl ClientConnectionHandler {
unreachable!()
}
}, if self.tcp_sniffer_api.is_some() => match message {
Ok(message) => self.respond(DaemonMessage::Tcp(message)).await?,
Ok((message, Some(log))) if self.ready_for_logs => {
self.respond(DaemonMessage::LogMessage(log)).await?;
self.respond(DaemonMessage::Tcp(message)).await?;
}
Ok((message, _)) => {
self.respond(DaemonMessage::Tcp(message)).await?;
},
Err(e) => break e,
},
message = async {
Expand Down Expand Up @@ -461,7 +470,9 @@ impl ClientConnectionHandler {
))
.await?;
}
ClientMessage::ReadyForLogs => {}
ClientMessage::ReadyForLogs => {
self.ready_for_logs = true;
}
}

Ok(true)
Expand Down Expand Up @@ -498,7 +509,7 @@ async fn start_agent(args: Args) -> Result<()> {
let mesh = args.mode.mesh();

let watched_task = WatchedTask::new(
TcpConnectionSniffer::TASK_NAME,
TcpConnectionSniffer::<RawSocketTcpCapture>::TASK_NAME,
TcpConnectionSniffer::new(sniffer_command_rx, args.network_interface, mesh).and_then(
|sniffer| async move {
let res = sniffer.start(cancellation_token).await;
Expand All @@ -512,7 +523,7 @@ async fn start_agent(args: Args) -> Result<()> {
let status = watched_task.status();
let task = run_thread_in_namespace(
watched_task.start(),
TcpConnectionSniffer::TASK_NAME.to_string(),
TcpConnectionSniffer::<RawSocketTcpCapture>::TASK_NAME.to_string(),
state.container_pid(),
"net",
);
Expand Down
5 changes: 4 additions & 1 deletion mirrord/agent/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use mirrord_protocol::{
use thiserror::Error;

use crate::{
client_connection::TlsSetupError, namespace::NamespaceError, sniffer::SnifferCommand,
client_connection::TlsSetupError, namespace::NamespaceError, sniffer::messages::SnifferCommand,
steal::StealerCommand,
};

Expand Down Expand Up @@ -135,6 +135,9 @@ pub(crate) enum AgentError {
/// Child agent process spawned in `main` failed.
#[error("Agent child process failed: {0}")]
AgentFailed(ExitStatus),

#[error("Exhausted possible identifiers for incoming connections.")]
ExhaustedConnectionId,
}

pub(crate) type Result<T, E = AgentError> = std::result::Result<T, E>;
1 change: 1 addition & 0 deletions mirrord/agent/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![feature(hash_extract_if)]
#![feature(let_chains)]
#![feature(type_alias_impl_trait)]
#![feature(entry_insert)]
#![cfg_attr(target_os = "linux", feature(tcp_quickack))]
#![feature(lazy_cell)]
#![warn(clippy::indexing_slicing)]
Expand Down
Loading

0 comments on commit dbe58ed

Please sign in to comment.