Skip to content

Commit

Permalink
Add ping pong subtask to extproxy (#3039)
Browse files Browse the repository at this point in the history
* Add ping pong subtask to extproxy

* Update mirrord/cli/src/external_proxy.rs

Co-authored-by: t4lz <[email protected]>

* Tiny

---------

Co-authored-by: t4lz <[email protected]>
  • Loading branch information
DmitryDodzin and t4lz authored Jan 28, 2025
1 parent e64a17e commit ee5f936
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 1 deletion.
1 change: 1 addition & 0 deletions changelog.d/3030.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add ping pong subtask to extproxy to keep agent connection alive while it is up.
4 changes: 4 additions & 0 deletions mirrord/cli/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ pub(crate) enum ExternalProxyError {
)]
#[diagnostic(help("{GENERAL_BUG}"))]
MissingTlsInfo,

#[error("External proxy ping pong with the agent failed: {0}")]
#[diagnostic(help("{GENERAL_BUG}"))]
PingPongFailed(String),
}

/// Errors that can occur when executing the `mirrord intproxy` command.
Expand Down
55 changes: 54 additions & 1 deletion mirrord/cli/src/external_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use local_ip_address::local_ip;
use mirrord_analytics::{AnalyticsReporter, CollectAnalytics, Reporter};
use mirrord_config::LayerConfig;
use mirrord_intproxy::agent_conn::{AgentConnection, ConnectionTlsError};
use mirrord_protocol::DaemonCodec;
use mirrord_protocol::{ClientMessage, DaemonCodec, DaemonMessage, LogLevel, LogMessage};
use tokio::net::{TcpListener, TcpStream};
use tokio_rustls::server::TlsStream;
use tokio_util::{either::Either, sync::CancellationToken};
Expand Down Expand Up @@ -81,6 +81,10 @@ pub async fn proxy(listen_port: u16, watch: drain::Watch) -> CliResult<()> {
let mut analytics = AnalyticsReporter::new(config.telemetry, execution_kind, watch);
(&config).collect_analytics(analytics.get_mut());

// This connection is just to keep the agent alive as long as the client side is running.
let mut own_agent_conn =
connect_and_ping(&config, agent_connect_info.clone(), &mut analytics).await?;

let tls_acceptor = create_external_proxy_tls_acceptor(&config).await?;
let listener = create_listen_socket(SocketAddr::new(
local_ip().unwrap_or_else(|_| Ipv4Addr::UNSPECIFIED.into()),
Expand All @@ -101,6 +105,8 @@ pub async fn proxy(listen_port: u16, watch: drain::Watch) -> CliResult<()> {
config.external_proxy.start_idle_timeout,
)));

let mut ping_pong_ticker = tokio::time::interval(Duration::from_secs(30));

loop {
tokio::select! {
conn = listener.accept() => {
Expand Down Expand Up @@ -147,6 +153,53 @@ pub async fn proxy(listen_port: u16, watch: drain::Watch) -> CliResult<()> {
}
}

message = own_agent_conn.agent_rx.recv() => {
tracing::debug!(?message, "received message on own connection");

match message {
Some(DaemonMessage::Pong) => continue,
Some(DaemonMessage::LogMessage(LogMessage {
level: LogLevel::Error,
message,
})) => {
tracing::error!("agent log: {message}");
}
Some(DaemonMessage::LogMessage(LogMessage {
level: LogLevel::Warn,
message,
})) => {
tracing::warn!("agent log: {message}");
}
Some(DaemonMessage::Close(reason)) => {
return Err(
ExternalProxyError::PingPongFailed(format!(
"agent closed connection with message: {reason}"
)).into()
);
}
Some(message) => {
return Err(
ExternalProxyError::PingPongFailed(format!(
"agent sent an unexpected message: {message:?}"
)).into()
);
}
None => {
return Err(
ExternalProxyError::PingPongFailed(
"agent unexpectedly closed connection".to_string(),
).into()
);
}
}
}

_ = ping_pong_ticker.tick() => {
tracing::debug!("sending ping");

let _ = own_agent_conn.agent_tx.send(ClientMessage::Ping).await;
}

_ = initial_connection_timeout.as_mut(), if connections.load(Ordering::Relaxed) == 0 => {
tracing::debug!("closing listener due to initial connection timeout");

Expand Down

0 comments on commit ee5f936

Please sign in to comment.