diff --git a/changelog.d/3030.fixed.md b/changelog.d/3030.fixed.md new file mode 100644 index 00000000000..1dc6f8097bb --- /dev/null +++ b/changelog.d/3030.fixed.md @@ -0,0 +1 @@ +Add ping pong subtask to extproxy to keep agent connection alive while it is up. diff --git a/mirrord/cli/src/error.rs b/mirrord/cli/src/error.rs index 8426737da0b..45bff2a09b0 100644 --- a/mirrord/cli/src/error.rs +++ b/mirrord/cli/src/error.rs @@ -113,6 +113,10 @@ pub(crate) enum ExternalProxyError { )] #[diagnostic(help("{GENERAL_BUG}"))] MissingTlsInfo, + + #[error("External proxy ping pong with the agent failed: {0}")] + #[diagnostic(help("{GENERAL_BUG}"))] + PingPongFailed(String), } /// Errors that can occur when executing the `mirrord intproxy` command. diff --git a/mirrord/cli/src/external_proxy.rs b/mirrord/cli/src/external_proxy.rs index 34bf46b82b8..3ff76dd6bd1 100644 --- a/mirrord/cli/src/external_proxy.rs +++ b/mirrord/cli/src/external_proxy.rs @@ -36,7 +36,7 @@ use local_ip_address::local_ip; use mirrord_analytics::{AnalyticsReporter, CollectAnalytics, Reporter}; use mirrord_config::LayerConfig; use mirrord_intproxy::agent_conn::{AgentConnection, ConnectionTlsError}; -use mirrord_protocol::DaemonCodec; +use mirrord_protocol::{ClientMessage, DaemonCodec, DaemonMessage, LogLevel, LogMessage}; use tokio::net::{TcpListener, TcpStream}; use tokio_rustls::server::TlsStream; use tokio_util::{either::Either, sync::CancellationToken}; @@ -81,6 +81,10 @@ pub async fn proxy(listen_port: u16, watch: drain::Watch) -> CliResult<()> { let mut analytics = AnalyticsReporter::new(config.telemetry, execution_kind, watch); (&config).collect_analytics(analytics.get_mut()); + // This connection is just to keep the agent alive as long as the client side is running. + let mut own_agent_conn = + connect_and_ping(&config, agent_connect_info.clone(), &mut analytics).await?; + let tls_acceptor = create_external_proxy_tls_acceptor(&config).await?; let listener = create_listen_socket(SocketAddr::new( local_ip().unwrap_or_else(|_| Ipv4Addr::UNSPECIFIED.into()), @@ -101,6 +105,8 @@ pub async fn proxy(listen_port: u16, watch: drain::Watch) -> CliResult<()> { config.external_proxy.start_idle_timeout, ))); + let mut ping_pong_ticker = tokio::time::interval(Duration::from_secs(30)); + loop { tokio::select! { conn = listener.accept() => { @@ -147,6 +153,53 @@ pub async fn proxy(listen_port: u16, watch: drain::Watch) -> CliResult<()> { } } + message = own_agent_conn.agent_rx.recv() => { + tracing::debug!(?message, "received message on own connection"); + + match message { + Some(DaemonMessage::Pong) => continue, + Some(DaemonMessage::LogMessage(LogMessage { + level: LogLevel::Error, + message, + })) => { + tracing::error!("agent log: {message}"); + } + Some(DaemonMessage::LogMessage(LogMessage { + level: LogLevel::Warn, + message, + })) => { + tracing::warn!("agent log: {message}"); + } + Some(DaemonMessage::Close(reason)) => { + return Err( + ExternalProxyError::PingPongFailed(format!( + "agent closed connection with message: {reason}" + )).into() + ); + } + Some(message) => { + return Err( + ExternalProxyError::PingPongFailed(format!( + "agent sent an unexpected message: {message:?}" + )).into() + ); + } + None => { + return Err( + ExternalProxyError::PingPongFailed( + "agent unexpectedly closed connection".to_string(), + ).into() + ); + } + } + } + + _ = ping_pong_ticker.tick() => { + tracing::debug!("sending ping"); + + let _ = own_agent_conn.agent_tx.send(ClientMessage::Ping).await; + } + _ = initial_connection_timeout.as_mut(), if connections.load(Ordering::Relaxed) == 0 => { tracing::debug!("closing listener due to initial connection timeout");