From a797c184f267eb644129efb7e7a5ef5bdd4bc6d2 Mon Sep 17 00:00:00 2001 From: Dmitry Dodzin Date: Thu, 2 Jan 2025 16:42:57 +0200 Subject: [PATCH 01/20] Naive fix? --- .../+update-single-portforward.changed.md | 1 + .../kube/src/api/kubernetes/portforwarder.rs | 19 +++++++++++-------- 2 files changed, 12 insertions(+), 8 deletions(-) create mode 100644 changelog.d/+update-single-portforward.changed.md diff --git a/changelog.d/+update-single-portforward.changed.md b/changelog.d/+update-single-portforward.changed.md new file mode 100644 index 00000000000..b0a3a984dd8 --- /dev/null +++ b/changelog.d/+update-single-portforward.changed.md @@ -0,0 +1 @@ +Add a naive update to our port forward wrapper to force fist check error channel instead of ranomly picking branch on `tokio::select!` impl. diff --git a/mirrord/kube/src/api/kubernetes/portforwarder.rs b/mirrord/kube/src/api/kubernetes/portforwarder.rs index bd5e6d8c983..c8f40e48598 100644 --- a/mirrord/kube/src/api/kubernetes/portforwarder.rs +++ b/mirrord/kube/src/api/kubernetes/portforwarder.rs @@ -162,18 +162,19 @@ impl SinglePortForwarder { loop { tokio::select! { + biased; + error = error_future.as_mut() => { + if let Some(error) = error { + tracing::warn!(?connect_info, %error, "error while performing port-forward"); + } - if retry_strategy.peek().is_none() || error.is_none() { + if retry_strategy.peek().is_none() { tracing::warn!(?connect_info, "finished retry strategy, closing connection"); break; } - if let Some(error) = error { - tracing::warn!(?connect_info, %error, "error while performing port-forward, retrying"); - } - match create_portforward_streams(&pod_api, &connect_info, &mut retry_strategy).await { Ok((next_stream, next_error_future)) => { let _ = stream.shutdown().await; @@ -193,9 +194,11 @@ impl SinglePortForwarder { copy_result = tokio::io::copy_bidirectional(&mut stream, &mut sink) => { if let Err(error) = copy_result { tracing::error!(?connect_info, %error, "unable to copy_bidirectional agent stream to local sink"); - - break; + } else { + tracing::info!(?connect_info, "closed copy_bidirectional agent stream to local sink"); } + + break; } } } @@ -208,7 +211,7 @@ pub async fn retry_portforward( client: &Client, connect_info: AgentKubernetesConnectInfo, ) -> Result<(Box, SinglePortForwarder)> { - let (lhs, rhs) = tokio::io::duplex(4096); + let (lhs, rhs) = tokio::io::duplex(1024 * 1024); let port_forwarder = SinglePortForwarder::connect(client, connect_info, Box::new(rhs)).await?; From 9101e84a98448c9eecf650c9f70b6009c5694a7d Mon Sep 17 00:00:00 2001 From: Dmitry Dodzin Date: Thu, 2 Jan 2025 17:02:16 +0200 Subject: [PATCH 02/20] Docs --- mirrord/kube/src/api/kubernetes/portforwarder.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/mirrord/kube/src/api/kubernetes/portforwarder.rs b/mirrord/kube/src/api/kubernetes/portforwarder.rs index c8f40e48598..a446c496fbe 100644 --- a/mirrord/kube/src/api/kubernetes/portforwarder.rs +++ b/mirrord/kube/src/api/kubernetes/portforwarder.rs @@ -211,6 +211,7 @@ pub async fn retry_portforward( client: &Client, connect_info: AgentKubernetesConnectInfo, ) -> Result<(Box, SinglePortForwarder)> { + // use 1024 * 1024 to be identical to `kube` implementation (https://github.com/kube-rs/kube/blob/ecbdafc214538aadc78ec8447f2fa12d0057492b/kube-client/src/api/portforward.rs#L101) let (lhs, rhs) = tokio::io::duplex(1024 * 1024); let port_forwarder = SinglePortForwarder::connect(client, connect_info, Box::new(rhs)).await?; From 2353fbbd88e408c11a161fb708ef72a05bfa4983 Mon Sep 17 00:00:00 2001 From: Dmitry Dodzin Date: Tue, 21 Jan 2025 12:34:49 +0200 Subject: [PATCH 03/20] x-reconnect-id --- Cargo.lock | 1 + mirrord/cli/src/internal_proxy.rs | 2 +- mirrord/cli/src/main.rs | 2 +- mirrord/intproxy/Cargo.toml | 1 + mirrord/intproxy/src/agent_conn.rs | 147 ++++++++++++++---- .../intproxy/src/agent_conn/portforward.rs | 25 +++ mirrord/intproxy/src/background_tasks.rs | 80 +++++++++- mirrord/intproxy/src/layer_conn.rs | 2 +- mirrord/intproxy/src/layer_initializer.rs | 2 +- mirrord/intproxy/src/lib.rs | 13 +- mirrord/intproxy/src/ping_pong.rs | 2 +- mirrord/intproxy/src/proxies/files.rs | 2 +- mirrord/intproxy/src/proxies/incoming.rs | 2 +- .../src/proxies/incoming/interceptor.rs | 15 +- mirrord/intproxy/src/proxies/outgoing.rs | 2 +- .../src/proxies/outgoing/interceptor.rs | 14 +- mirrord/intproxy/src/proxies/simple.rs | 2 +- .../kube/src/api/kubernetes/portforwarder.rs | 4 + mirrord/operator/src/client.rs | 47 ++++-- mirrord/operator/src/client/upgrade.rs | 30 +++- 20 files changed, 326 insertions(+), 69 deletions(-) create mode 100644 mirrord/intproxy/src/agent_conn/portforward.rs diff --git a/Cargo.lock b/Cargo.lock index 2cfeea353a5..1d0c9ba66ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4307,6 +4307,7 @@ dependencies = [ "serde", "thiserror 2.0.9", "tokio", + "tokio-retry", "tokio-rustls 0.26.1", "tokio-stream", "tracing", diff --git a/mirrord/cli/src/internal_proxy.rs b/mirrord/cli/src/internal_proxy.rs index 1c8763a0d92..e17a981cb12 100644 --- a/mirrord/cli/src/internal_proxy.rs +++ b/mirrord/cli/src/internal_proxy.rs @@ -161,7 +161,7 @@ pub(crate) async fn connect_and_ping( connect_info: Option, analytics: &mut AnalyticsReporter, ) -> CliResult { - let mut agent_conn = AgentConnection::new(config, connect_info, analytics) + let mut agent_conn = AgentConnection::new(config, connect_info, None, analytics) .await .map_err(IntProxyError::from)?; diff --git a/mirrord/cli/src/main.rs b/mirrord/cli/src/main.rs index 27923cab4fd..e23836b6c43 100644 --- a/mirrord/cli/src/main.rs +++ b/mirrord/cli/src/main.rs @@ -591,7 +591,7 @@ async fn port_forward(args: &PortForwardArgs, watch: drain::Watch) -> CliResult< // errors from AgentConnection::new get mapped to CliError manually to prevent unreadably long // error print-outs - let agent_conn = AgentConnection::new(&config, Some(connection_info), &mut analytics) + let agent_conn = AgentConnection::new(&config, Some(connection_info), None, &mut analytics) .await .map_err(|agent_con_error| match agent_con_error { AgentConnectionError::Io(error) => CliError::PortForwardingSetupError(error.into()), diff --git a/mirrord/intproxy/Cargo.toml b/mirrord/intproxy/Cargo.toml index ede6a260c02..57f7fdedab0 100644 --- a/mirrord/intproxy/Cargo.toml +++ b/mirrord/intproxy/Cargo.toml @@ -40,6 +40,7 @@ http-body-util.workspace = true bytes.workspace = true futures.workspace = true rand.workspace = true +tokio-retry = "0.3" tokio-rustls.workspace = true rustls.workspace = true rustls-pemfile.workspace = true diff --git a/mirrord/intproxy/src/agent_conn.rs b/mirrord/intproxy/src/agent_conn.rs index 1d982778411..c4aa2cbb96d 100644 --- a/mirrord/intproxy/src/agent_conn.rs +++ b/mirrord/intproxy/src/agent_conn.rs @@ -6,20 +6,21 @@ use std::{ io, io::BufReader, net::{IpAddr, SocketAddr}, + ops::ControlFlow, path::{Path, PathBuf}, sync::Arc, + time::Duration, }; -use mirrord_analytics::Reporter; +use mirrord_analytics::{NullReporter, Reporter}; use mirrord_config::LayerConfig; use mirrord_kube::{ - api::{ - kubernetes::{AgentKubernetesConnectInfo, KubernetesAPI}, - wrap_raw_connection, - }, + api::{kubernetes::AgentKubernetesConnectInfo, wrap_raw_connection}, error::KubeApiError, }; -use mirrord_operator::client::{error::OperatorApiError, OperatorApi, OperatorSession}; +use mirrord_operator::client::{ + error::OperatorApiError, OperatorApi, OperatorSession, WebSocketStreamId, +}; use mirrord_protocol::{ClientMessage, DaemonMessage}; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -30,14 +31,20 @@ use tokio::{ mpsc::{Receiver, Sender}, }, }; +use tokio_retry::{ + strategy::{jitter, ExponentialBackoff}, + Retry, +}; use tokio_rustls::TlsConnector; use tracing::Level; use crate::{ - background_tasks::{BackgroundTask, MessageBus}, + background_tasks::{BackgroundTask, MessageBus, RestartableBackgroundTask}, ProxyMessage, }; +mod portforward; + /// Errors that can occur when the internal proxy tries to establish a connection with the agent. #[derive(Error, Debug)] pub enum AgentConnectionError { @@ -97,6 +104,18 @@ pub enum AgentConnectInfo { DirectKubernetes(AgentKubernetesConnectInfo), } +#[derive(Debug, Default)] +pub enum ReconnectParams { + ConnectInfo { + config: LayerConfig, + connect_info: AgentConnectInfo, + websocket_stream_id: WebSocketStreamId, + }, + + #[default] + Break, +} + /// Handles logic of the `proxy <-> agent` connection as a [`BackgroundTask`]. /// /// # Note @@ -107,6 +126,7 @@ pub enum AgentConnectInfo { pub struct AgentConnection { pub agent_tx: Sender, pub agent_rx: Receiver, + pub reconnect: ReconnectParams, } impl AgentConnection { @@ -115,13 +135,27 @@ impl AgentConnection { pub async fn new( config: &LayerConfig, connect_info: Option, + websocket_stream_id: Option, analytics: &mut R, ) -> Result { - let (agent_tx, agent_rx) = match connect_info { + let (agent_tx, agent_rx, reconnect) = match connect_info { Some(AgentConnectInfo::Operator(session)) => { - let connection = - OperatorApi::connect_in_existing_session(config, session, analytics).await?; - (connection.tx, connection.rx) + let connection = OperatorApi::connect_in_existing_session( + config, + session.clone(), + websocket_stream_id, + analytics, + ) + .await?; + ( + connection.tx, + connection.rx, + ReconnectParams::ConnectInfo { + config: config.clone(), + connect_info: AgentConnectInfo::Operator(session), + websocket_stream_id: connection.stream_id, + }, + ) } Some(AgentConnectInfo::ExternalProxy(proxy_addr)) => { @@ -131,7 +165,7 @@ impl AgentConnection { let stream = socket.connect(proxy_addr).await?; - if config.external_proxy.tls_enable + let (tx, rx) = if config.external_proxy.tls_enable && let ( Some(tls_certificate), Some(client_tls_certificate), @@ -140,8 +174,7 @@ impl AgentConnection { config.external_proxy.tls_certificate.as_ref(), config.internal_proxy.client_tls_certificate.as_ref(), config.internal_proxy.client_tls_key.as_ref(), - ) - { + ) { wrap_connection_with_tls( stream, proxy_addr.ip(), @@ -152,20 +185,14 @@ impl AgentConnection { .await? } else { wrap_raw_connection(stream) - } + }; + + (tx, rx, ReconnectParams::default()) } Some(AgentConnectInfo::DirectKubernetes(connect_info)) => { - let k8s_api = KubernetesAPI::create(config) - .await - .map_err(AgentConnectionError::Kube)?; - - let stream = k8s_api - .create_connection_portforward(connect_info.clone()) - .await - .map_err(AgentConnectionError::Kube)?; - - wrap_raw_connection(stream) + let (tx, rx) = portforward::create_connection(config, connect_info.clone()).await?; + (tx, rx, ReconnectParams::default()) } None => { @@ -174,18 +201,27 @@ impl AgentConnection { .as_ref() .ok_or(AgentConnectionError::NoConnectionMethod)?; let stream = TcpStream::connect(address).await?; - wrap_raw_connection(stream) + let (tx, rx) = wrap_raw_connection(stream); + (tx, rx, ReconnectParams::default()) } }; - Ok(Self { agent_tx, agent_rx }) + Ok(Self { + agent_tx, + agent_rx, + reconnect, + }) } pub async fn new_for_raw_address(address: SocketAddr) -> Result { let stream = TcpStream::connect(address).await?; let (agent_tx, agent_rx) = wrap_raw_connection(stream); - Ok(Self { agent_tx, agent_rx }) + Ok(Self { + agent_tx, + agent_rx, + reconnect: ReconnectParams::Break, + }) } #[tracing::instrument(level = Level::TRACE, name = "send_agent_message", skip(self), ret)] @@ -205,7 +241,9 @@ impl BackgroundTask for AgentConnection { type MessageIn = ClientMessage; type MessageOut = ProxyMessage; - async fn run(mut self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { + async fn run(&mut self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { + let mut sleep = Box::pin(tokio::time::sleep(Duration::from_secs(30))); + loop { tokio::select! { msg = message_bus.recv() => match msg { @@ -227,6 +265,57 @@ impl BackgroundTask for AgentConnection { break Err(AgentChannelError); } Some(msg) => message_bus.send(ProxyMessage::FromAgent(msg)).await, + }, + + _ = sleep.as_mut() => { + break Err(AgentChannelError); + } + } + } + } +} + +impl RestartableBackgroundTask for AgentConnection { + #[tracing::instrument(level = Level::TRACE, skip(self, _bus), ret)] + async fn restart( + &mut self, + error: Self::Error, + _bus: &mut MessageBus, + ) -> ControlFlow { + match &self.reconnect { + ReconnectParams::Break => ControlFlow::Break(error), + ReconnectParams::ConnectInfo { + config, + connect_info, + websocket_stream_id, + } => { + let retry_strategy = ExponentialBackoff::from_millis(50).map(jitter).take(10); + + let connection = Retry::spawn(retry_strategy, || async move { + AgentConnection::new( + config, + connect_info.clone().into(), + Some(websocket_stream_id.clone()), + &mut NullReporter::default(), + ) + .await + .inspect_err( + |err| tracing::error!(error = ?err, "unable to connect to agent upon retry"), + ) + }) + .await; + + match connection { + Ok(connection) => { + *self = connection; + + ControlFlow::Continue(()) + } + Err(error) => { + tracing::error!(?error, "unable to reconnect agent"); + + ControlFlow::Break(AgentChannelError) + } } } } diff --git a/mirrord/intproxy/src/agent_conn/portforward.rs b/mirrord/intproxy/src/agent_conn/portforward.rs new file mode 100644 index 00000000000..9632323cc1f --- /dev/null +++ b/mirrord/intproxy/src/agent_conn/portforward.rs @@ -0,0 +1,25 @@ +use mirrord_config::LayerConfig; +use mirrord_kube::api::{ + kubernetes::{AgentKubernetesConnectInfo, KubernetesAPI}, + wrap_raw_connection, +}; +use mirrord_protocol::{ClientMessage, DaemonMessage}; +use tokio::sync::mpsc; + +use crate::agent_conn::AgentConnectionError; + +pub async fn create_connection( + config: &LayerConfig, + connect_info: AgentKubernetesConnectInfo, +) -> Result<(mpsc::Sender, mpsc::Receiver), AgentConnectionError> { + let k8s_api = KubernetesAPI::create(config) + .await + .map_err(AgentConnectionError::Kube)?; + + let stream = k8s_api + .create_connection_portforward(connect_info.clone()) + .await + .map_err(AgentConnectionError::Kube)?; + + Ok(wrap_raw_connection(stream)) +} diff --git a/mirrord/intproxy/src/background_tasks.rs b/mirrord/intproxy/src/background_tasks.rs index e43c8f306b8..4eacb0d5fa8 100644 --- a/mirrord/intproxy/src/background_tasks.rs +++ b/mirrord/intproxy/src/background_tasks.rs @@ -6,7 +6,7 @@ //! Each background task implement the [`BackgroundTask`] trait, which specifies its properties and //! allows for managing groups of related tasks with one [`BackgroundTasks`] instance. -use std::{collections::HashMap, fmt, future::Future, hash::Hash}; +use std::{collections::HashMap, fmt, future::Future, hash::Hash, ops::ControlFlow}; use tokio::{ sync::mpsc::{self, Receiver, Sender}, @@ -35,6 +35,13 @@ impl MessageBus { msg = self.rx.recv() => msg, } } + + pub fn cast(&mut self) -> &mut MessageBus + where + R: BackgroundTask, + { + unsafe { &mut *(self as *mut MessageBus as *mut MessageBus) } + } } /// Common trait for all background tasks in the internal proxy. @@ -51,11 +58,63 @@ pub trait BackgroundTask: Sized { /// When the [`MessageBus`] has no more messages to be consumed, the task should exit without /// errors. fn run( - self, + &mut self, message_bus: &mut MessageBus, ) -> impl Future> + Send; } +pub trait RestartableBackgroundTask: BackgroundTask { + fn restart( + &mut self, + run_error: Self::Error, + message_bus: &mut MessageBus, + ) -> impl Future> + Send; +} + +pub struct RestartableBackgroundTaskWrapper { + task: T, +} + +impl BackgroundTask for RestartableBackgroundTaskWrapper +where + T: RestartableBackgroundTask + Send, + T::MessageIn: Send, + T::MessageOut: Send, + T::Error: Send, +{ + type Error = T::Error; + type MessageIn = T::MessageIn; + type MessageOut = T::MessageOut; + + async fn run(&mut self, bus: &mut MessageBus) -> Result<(), Self::Error> { + let RestartableBackgroundTaskWrapper { task } = self; + let task_bus = bus.cast(); + + match task.run(task_bus).await { + Err(run_error) => { + let mut run_error = Some(run_error); + + loop { + match task + .restart(run_error.take().expect("should contain an error"), task_bus) + .await + { + ControlFlow::Break(err) => return Err(err), + ControlFlow::Continue(()) => { + if let Err(err) = task.run(task_bus).await { + run_error = Some(err); + } else { + return Ok(()); + } + } + } + } + } + Ok(()) => Ok(()), + } + } +} + /// A struct for managing groups of related [`BackgroundTasks`]. /// Tasks managed with a single instance of this struct must produce messages of the same type /// `MOut` and return errors convertible to `Err`. @@ -92,7 +151,7 @@ where /// # Panics /// /// This method panics when attempting to register a task with a duplicate id. - pub fn register(&mut self, task: T, id: Id, channel_size: usize) -> TaskSender + pub fn register(&mut self, mut task: T, id: Id, channel_size: usize) -> TaskSender where T: 'static + BackgroundTask + Send, Err: From, @@ -123,6 +182,21 @@ where TaskSender(in_msg_tx) } + pub fn register_restartable( + &mut self, + task: T, + id: Id, + channel_size: usize, + ) -> TaskSender> + where + T: 'static + RestartableBackgroundTask + Send, + Err: From, + T::MessageIn: Send, + T::Error: Send, + { + self.register(RestartableBackgroundTaskWrapper { task }, id, channel_size) + } + /// Returns the next update from one of registered tasks. pub async fn next(&mut self) -> Option<(Id, TaskUpdate)> { let (id, msg) = self.streams.next().await?; diff --git a/mirrord/intproxy/src/layer_conn.rs b/mirrord/intproxy/src/layer_conn.rs index 0cdd1c792a0..22178e39e0a 100644 --- a/mirrord/intproxy/src/layer_conn.rs +++ b/mirrord/intproxy/src/layer_conn.rs @@ -51,7 +51,7 @@ impl BackgroundTask for LayerConnection { type MessageIn = LocalMessage; type MessageOut = ProxyMessage; - async fn run(mut self, message_bus: &mut MessageBus) -> Result<(), CodecError> { + async fn run(&mut self, message_bus: &mut MessageBus) -> Result<(), CodecError> { loop { tokio::select! { res = self.layer_codec_rx.receive() => match res { diff --git a/mirrord/intproxy/src/layer_initializer.rs b/mirrord/intproxy/src/layer_initializer.rs index 0ca702b7d7d..ff793b81284 100644 --- a/mirrord/intproxy/src/layer_initializer.rs +++ b/mirrord/intproxy/src/layer_initializer.rs @@ -92,7 +92,7 @@ impl BackgroundTask for LayerInitializer { type MessageIn = (); type MessageOut = ProxyMessage; - async fn run(mut self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { + async fn run(&mut self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { loop { tokio::select! { None = message_bus.recv() => { diff --git a/mirrord/intproxy/src/lib.rs b/mirrord/intproxy/src/lib.rs index 7dd396c9eb0..759d873a0bf 100644 --- a/mirrord/intproxy/src/lib.rs +++ b/mirrord/intproxy/src/lib.rs @@ -22,7 +22,9 @@ use tokio::{net::TcpListener, time}; use tracing::Level; use crate::{ - agent_conn::AgentConnection, background_tasks::TaskError, error::IntProxyError, + agent_conn::AgentConnection, + background_tasks::{RestartableBackgroundTaskWrapper, TaskError}, + error::IntProxyError, main_tasks::LayerClosed, }; @@ -41,7 +43,7 @@ mod request_queue; struct TaskTxs { layers: HashMap>, _layer_initializer: TaskSender, - agent: TaskSender, + agent: TaskSender>, simple: TaskSender, outgoing: TaskSender, incoming: TaskSender, @@ -77,8 +79,11 @@ impl IntProxy { let mut background_tasks: BackgroundTasks = Default::default(); - let agent = - background_tasks.register(agent_conn, MainTaskId::AgentConnection, Self::CHANNEL_SIZE); + let agent = background_tasks.register_restartable( + agent_conn, + MainTaskId::AgentConnection, + Self::CHANNEL_SIZE, + ); let layer_initializer = background_tasks.register( LayerInitializer::new(listener), MainTaskId::LayerInitializer, diff --git a/mirrord/intproxy/src/ping_pong.rs b/mirrord/intproxy/src/ping_pong.rs index f83f0c696a2..e7723677657 100644 --- a/mirrord/intproxy/src/ping_pong.rs +++ b/mirrord/intproxy/src/ping_pong.rs @@ -66,7 +66,7 @@ impl BackgroundTask for PingPong { /// /// When the time comes to ping the agent and the previous ping was not answered, this task /// exits with an error. - async fn run(mut self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { + async fn run(&mut self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { loop { tokio::select! { _ = self.ticker.tick() => { diff --git a/mirrord/intproxy/src/proxies/files.rs b/mirrord/intproxy/src/proxies/files.rs index 79ac6695575..e83d2f2b764 100644 --- a/mirrord/intproxy/src/proxies/files.rs +++ b/mirrord/intproxy/src/proxies/files.rs @@ -774,7 +774,7 @@ impl BackgroundTask for FilesProxy { type MessageOut = ProxyMessage; type Error = FilesProxyError; - async fn run(mut self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { + async fn run(&mut self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { while let Some(message) = message_bus.recv().await { tracing::trace!(?message, "new message in message_bus"); diff --git a/mirrord/intproxy/src/proxies/incoming.rs b/mirrord/intproxy/src/proxies/incoming.rs index 966d1175acb..759574607d8 100644 --- a/mirrord/intproxy/src/proxies/incoming.rs +++ b/mirrord/intproxy/src/proxies/incoming.rs @@ -453,7 +453,7 @@ impl BackgroundTask for IncomingProxy { type MessageOut = ProxyMessage; #[tracing::instrument(level = Level::TRACE, skip_all, err)] - async fn run(mut self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { + async fn run(&mut self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { loop { tokio::select! { Some(((connection_id, request_id), stream_item)) = self.response_body_rxs.next() => match stream_item { diff --git a/mirrord/intproxy/src/proxies/incoming/interceptor.rs b/mirrord/intproxy/src/proxies/incoming/interceptor.rs index 2d6486d709f..31acfe82f21 100644 --- a/mirrord/intproxy/src/proxies/incoming/interceptor.rs +++ b/mirrord/intproxy/src/proxies/incoming/interceptor.rs @@ -126,7 +126,7 @@ pub type InterceptorResult = core::result::Result /// When it received [`MessageIn::Http`], it starts acting as an HTTP gateway. pub struct Interceptor { /// Socket that should be used to make the first connection (should already be bound). - socket: TcpSocket, + socket: Option, /// Address of user app's listener. peer: SocketAddr, /// Version of [`mirrord_protocol`] negotiated with the agent. @@ -146,7 +146,7 @@ impl Interceptor { agent_protocol_version: Option, ) -> Self { Self { - socket, + socket: Some(socket), peer, agent_protocol_version, } @@ -159,8 +159,15 @@ impl BackgroundTask for Interceptor { type MessageOut = MessageOut; #[tracing::instrument(level = Level::TRACE, skip_all, err)] - async fn run(self, message_bus: &mut MessageBus) -> InterceptorResult<(), Self::Error> { - let mut stream = self.socket.connect(self.peer).await?; + async fn run( + &mut self, + message_bus: &mut MessageBus, + ) -> InterceptorResult<(), Self::Error> { + let Some(socket) = self.socket.take() else { + return Ok(()); + }; + + let mut stream = socket.connect(self.peer).await?; // First, we determine whether this is a raw TCP connection or an HTTP connection. // If we receive an HTTP request from our parent task, this must be an HTTP connection. diff --git a/mirrord/intproxy/src/proxies/outgoing.rs b/mirrord/intproxy/src/proxies/outgoing.rs index d7782053aa9..1f0372b255b 100644 --- a/mirrord/intproxy/src/proxies/outgoing.rs +++ b/mirrord/intproxy/src/proxies/outgoing.rs @@ -234,7 +234,7 @@ impl BackgroundTask for OutgoingProxy { type MessageIn = OutgoingProxyMessage; type MessageOut = ProxyMessage; - async fn run(mut self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { + async fn run(&mut self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { loop { tokio::select! { msg = message_bus.recv() => match msg { diff --git a/mirrord/intproxy/src/proxies/outgoing/interceptor.rs b/mirrord/intproxy/src/proxies/outgoing/interceptor.rs index 12c729b6d6c..cc9f6aeabd5 100644 --- a/mirrord/intproxy/src/proxies/outgoing/interceptor.rs +++ b/mirrord/intproxy/src/proxies/outgoing/interceptor.rs @@ -12,14 +12,16 @@ use crate::{ /// Multiple instances are run as [`BackgroundTask`]s by one [`OutgoingProxy`](super::OutgoingProxy) /// to manage individual connections. pub struct Interceptor { - socket: PreparedSocket, + socket: Option, } impl Interceptor { /// Creates a new instance. This instance will use the provided [`PreparedSocket`] to accept the /// layer's connection and manage it. pub fn new(socket: PreparedSocket) -> Self { - Self { socket } + Self { + socket: Some(socket), + } } } @@ -42,8 +44,12 @@ impl BackgroundTask for Interceptor { /// /// 3. This implementation exits only when an error is encountered or the [`MessageBus`] is /// closed. - async fn run(self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { - let mut connected_socket = self.socket.accept().await?; + async fn run(&mut self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { + let Some(socket) = self.socket.take() else { + return Ok(()); + }; + + let mut connected_socket = socket.accept().await?; let mut reading_closed = false; loop { diff --git a/mirrord/intproxy/src/proxies/simple.rs b/mirrord/intproxy/src/proxies/simple.rs index dae7881247e..5b1433114c2 100644 --- a/mirrord/intproxy/src/proxies/simple.rs +++ b/mirrord/intproxy/src/proxies/simple.rs @@ -45,7 +45,7 @@ impl BackgroundTask for SimpleProxy { type MessageIn = SimpleProxyMessage; type MessageOut = ProxyMessage; - async fn run(mut self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { + async fn run(&mut self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { while let Some(msg) = message_bus.recv().await { tracing::trace!(?msg, "new message in message_bus"); diff --git a/mirrord/kube/src/api/kubernetes/portforwarder.rs b/mirrord/kube/src/api/kubernetes/portforwarder.rs index a446c496fbe..addc55a4358 100644 --- a/mirrord/kube/src/api/kubernetes/portforwarder.rs +++ b/mirrord/kube/src/api/kubernetes/portforwarder.rs @@ -167,6 +167,10 @@ impl SinglePortForwarder { error = error_future.as_mut() => { if let Some(error) = error { tracing::warn!(?connect_info, %error, "error while performing port-forward"); + } else { + tracing::warn!(?connect_info, "connection exited without error"); + + break; } if retry_strategy.peek().is_none() { diff --git a/mirrord/operator/src/client.rs b/mirrord/operator/src/client.rs index b92b3b1ea97..25e8569437f 100644 --- a/mirrord/operator/src/client.rs +++ b/mirrord/operator/src/client.rs @@ -44,6 +44,8 @@ mod discovery; pub mod error; mod upgrade; +pub use upgrade::WebSocketStreamId; + /// State of client's [`Certificate`] the should be attached to some operator requests. pub trait ClientCertificateState: fmt::Debug {} @@ -132,6 +134,8 @@ impl fmt::Debug for OperatorSession { /// Connection to an operator target. pub struct OperatorSessionConnection { + /// Unique ID of this connection. + pub stream_id: WebSocketStreamId, /// Session of this connection. pub session: OperatorSession, /// Used to send [`ClientMessage`]s to the operator. @@ -146,6 +150,7 @@ impl fmt::Debug for OperatorSessionConnection { let rx_queued_messages = self.rx.len(); f.debug_struct("OperatorSessionConnection") + .field("stream_id", &self.stream_id) .field("session", &self.session) .field("tx_closed", &self.tx.is_closed()) .field("tx_queued_messages", &tx_queued_messages) @@ -664,10 +669,15 @@ impl OperatorApi { }; let mut connection_subtask = progress.subtask("connecting to the target"); - let (tx, rx) = Self::connect_target(&self.client, &session).await?; + let (tx, rx, stream_id) = Self::connect_target(&self.client, &session, None).await?; connection_subtask.success(Some("connected to the target")); - Ok(OperatorSessionConnection { session, tx, rx }) + Ok(OperatorSessionConnection { + stream_id, + session, + tx, + rx, + }) } /// Returns client cert's public key in a base64 encoded string (no padding same like in @@ -742,6 +752,7 @@ impl OperatorApi { pub async fn connect_in_existing_session( layer_config: &LayerConfig, session: OperatorSession, + websocket_stream_id: Option, reporter: &mut R, ) -> OperatorApiResult where @@ -767,9 +778,15 @@ impl OperatorApi { .map_err(KubeApiError::from) .map_err(OperatorApiError::CreateKubeClient)?; - let (tx, rx) = Self::connect_target(&client, &session).await?; + let (tx, rx, stream_id) = + Self::connect_target(&client, &session, websocket_stream_id).await?; - Ok(OperatorSessionConnection { tx, rx, session }) + Ok(OperatorSessionConnection { + stream_id, + tx, + rx, + session, + }) } /// Creates websocket connection to the operator target. @@ -777,23 +794,33 @@ impl OperatorApi { async fn connect_target( client: &Client, session: &OperatorSession, - ) -> OperatorApiResult<(Sender, Receiver)> { + websocket_stream_id: Option, + ) -> OperatorApiResult<( + Sender, + Receiver, + WebSocketStreamId, + )> { let request = Request::builder() .uri(&session.connect_url) .header(SESSION_ID_HEADER, session.id.to_string()) .body(vec![]) .map_err(OperatorApiError::ConnectRequestBuildError)?; - let connection = upgrade::connect_ws(client, request) + let connection = upgrade::connect_ws(client, request, websocket_stream_id.as_ref()) .await .map_err(|error| OperatorApiError::KubeError { error, operation: OperatorOperation::WebsocketConnection, })?; - Ok(ConnectionWrapper::wrap( - connection, - session.operator_protocol_version.clone(), - )) + tracing::debug!( + websocket_stream_id = ?connection.stream_id, + "upgraded ws" + ); + + let (tx, rx) = + ConnectionWrapper::wrap(connection.stream, session.operator_protocol_version.clone()); + + Ok((tx, rx, connection.stream_id)) } } diff --git a/mirrord/operator/src/client/upgrade.rs b/mirrord/operator/src/client/upgrade.rs index b75d71f12c2..e8fc5a6aa8a 100644 --- a/mirrord/operator/src/client/upgrade.rs +++ b/mirrord/operator/src/client/upgrade.rs @@ -21,6 +21,8 @@ use tokio_tungstenite::{tungstenite::protocol::Role, WebSocketStream}; const WS_PROTOCOL: &str = "v4.channel.k8s.io"; +pub type WebSocketStreamId = [u8; 16]; + // Verify upgrade response according to RFC6455. // Based on `tungstenite` and added subprotocol verification. async fn verify_response(res: Response, key: &HeaderValue) -> Result> { @@ -96,18 +98,23 @@ async fn verify_response(res: Response, key: &HeaderValue) -> Result HeaderValue { - let random: [u8; 16] = rand::random(); +fn sec_websocket_key_header(webscoket_id: &[u8]) -> HeaderValue { base64::engine::general_purpose::STANDARD - .encode(random) + .encode(webscoket_id) .parse() .expect("should be valid") } +pub struct WebSocketStreamWithId { + pub stream_id: WebSocketStreamId, + pub stream: WebSocketStream, +} + pub async fn connect_ws( client: &Client, request: Request>, -) -> kube::Result>> { + websocket_stream_id: Option<&WebSocketStreamId>, +) -> kube::Result>> { let (mut parts, body) = request.into_parts(); parts.headers.insert( http::header::CONNECTION, @@ -120,7 +127,8 @@ pub async fn connect_ws( http::header::SEC_WEBSOCKET_VERSION, HeaderValue::from_static("13"), ); - let key = sec_websocket_key(); + let stream_id: WebSocketStreamId = rand::random(); + let key = sec_websocket_key_header(&stream_id); parts .headers .insert(http::header::SEC_WEBSOCKET_KEY, key.clone()); @@ -134,13 +142,23 @@ pub async fn connect_ws( HeaderValue::from_static(WS_PROTOCOL), ); + if let Some(reconnect_id_header) = websocket_stream_id { + parts.headers.insert( + "x-reconnect-id", + sec_websocket_key_header(reconnect_id_header), + ); + } + let res = client .send(Request::from_parts(parts, Body::from(body))) .await?; let res = verify_response(res, &key).await?; match hyper::upgrade::on(res).await { Ok(upgraded) => { - Ok(WebSocketStream::from_raw_socket(TokioIo::new(upgraded), Role::Client, None).await) + let stream = + WebSocketStream::from_raw_socket(TokioIo::new(upgraded), Role::Client, None).await; + + Ok(WebSocketStreamWithId { stream_id, stream }) } Err(e) => Err(Error::UpgradeConnection( From c654410baed0692d7922aa2bfd6328783288fc31 Mon Sep 17 00:00:00 2001 From: Dmitry Dodzin Date: Wed, 22 Jan 2025 20:13:40 +0200 Subject: [PATCH 04/20] Current attempt --- mirrord/intproxy/src/agent_conn.rs | 7 ++++--- mirrord/intproxy/src/background_tasks.rs | 19 +++++++++++++++++-- mirrord/intproxy/src/lib.rs | 15 +++++++++++++-- mirrord/intproxy/src/main_tasks.rs | 2 ++ mirrord/intproxy/src/proxies/incoming.rs | 17 +++++++++++++++++ .../src/proxies/incoming/subscriptions.rs | 17 +++++++++++++++-- mirrord/intproxy/src/remote_resources.rs | 1 + 7 files changed, 69 insertions(+), 9 deletions(-) diff --git a/mirrord/intproxy/src/agent_conn.rs b/mirrord/intproxy/src/agent_conn.rs index c4aa2cbb96d..2ea9a84a87c 100644 --- a/mirrord/intproxy/src/agent_conn.rs +++ b/mirrord/intproxy/src/agent_conn.rs @@ -276,11 +276,11 @@ impl BackgroundTask for AgentConnection { } impl RestartableBackgroundTask for AgentConnection { - #[tracing::instrument(level = Level::TRACE, skip(self, _bus), ret)] + #[tracing::instrument(level = Level::TRACE, skip(self, message_bus), ret)] async fn restart( &mut self, error: Self::Error, - _bus: &mut MessageBus, + message_bus: &mut MessageBus, ) -> ControlFlow { match &self.reconnect { ReconnectParams::Break => ControlFlow::Break(error), @@ -295,7 +295,7 @@ impl RestartableBackgroundTask for AgentConnection { AgentConnection::new( config, connect_info.clone().into(), - Some(websocket_stream_id.clone()), + Some(*websocket_stream_id), &mut NullReporter::default(), ) .await @@ -308,6 +308,7 @@ impl RestartableBackgroundTask for AgentConnection { match connection { Ok(connection) => { *self = connection; + message_bus.send(ProxyMessage::ConnectionRefresh).await; ControlFlow::Continue(()) } diff --git a/mirrord/intproxy/src/background_tasks.rs b/mirrord/intproxy/src/background_tasks.rs index 4eacb0d5fa8..1e974f8d236 100644 --- a/mirrord/intproxy/src/background_tasks.rs +++ b/mirrord/intproxy/src/background_tasks.rs @@ -197,6 +197,15 @@ where self.register(RestartableBackgroundTaskWrapper { task }, id, channel_size) } + pub async fn kill_task(&mut self, id: Id) { + self.streams.remove(&id); + let Some(task) = self.handles.remove(&id) else { + return; + }; + + task.abort(); + } + /// Returns the next update from one of registered tasks. pub async fn next(&mut self) -> Option<(Id, TaskUpdate)> { let (id, msg) = self.streams.next().await?; @@ -210,7 +219,10 @@ where .expect("task handles and streams are out of sync") .await; match res { - Err(..) => (id, TaskUpdate::Finished(Err(TaskError::Panic))), + Err(error) => { + tracing::error!(?error, "task panicked"); + (id, TaskUpdate::Finished(Err(TaskError::Panic))) + } Ok(res) => (id, TaskUpdate::Finished(res.map_err(TaskError::Error))), } } @@ -228,7 +240,10 @@ where let mut results = Vec::with_capacity(self.handles.len()); for (id, handle) in self.handles { let result = match handle.await { - Err(..) => Err(TaskError::Panic), + Err(error) => { + tracing::error!(?error, "task panicked"); + Err(TaskError::Panic) + } Ok(res) => res.map_err(TaskError::Error), }; results.push((id, result)); diff --git a/mirrord/intproxy/src/lib.rs b/mirrord/intproxy/src/lib.rs index 759d873a0bf..c1b7ab1ee7f 100644 --- a/mirrord/intproxy/src/lib.rs +++ b/mirrord/intproxy/src/lib.rs @@ -225,6 +225,7 @@ impl IntProxy { .await; } } + ProxyMessage::ConnectionRefresh => self.handle_connection_refresh().await?, } Ok(()) @@ -275,7 +276,7 @@ impl IntProxy { /// Routes most messages from the agent to the correct background task. /// Some messages are handled here. - #[tracing::instrument(level = Level::TRACE, skip(self), ret)] + #[tracing::instrument(level = Level::TRACE, skip(self))] async fn handle_agent_message(&mut self, message: DaemonMessage) -> Result<(), IntProxyError> { match message { DaemonMessage::Pong => self.task_txs.ping_pong.send(AgentSentPong).await, @@ -352,7 +353,7 @@ impl IntProxy { } /// Routes a message from the layer to the correct background task. - #[tracing::instrument(level = Level::TRACE, skip(self), ret)] + #[tracing::instrument(level = Level::TRACE, skip(self))] async fn handle_layer_message(&self, message: FromLayer) -> Result<(), IntProxyError> { let FromLayer { message_id, @@ -400,4 +401,14 @@ impl IntProxy { Ok(()) } + + #[tracing::instrument(level = Level::TRACE, skip(self))] + async fn handle_connection_refresh(&self) -> Result<(), IntProxyError> { + self.task_txs + .incoming + .send(IncomingProxyMessage::ConnectionRefresh) + .await; + + Ok(()) + } } diff --git a/mirrord/intproxy/src/main_tasks.rs b/mirrord/intproxy/src/main_tasks.rs index fd0e5d7e8e5..47bec301c08 100644 --- a/mirrord/intproxy/src/main_tasks.rs +++ b/mirrord/intproxy/src/main_tasks.rs @@ -19,6 +19,8 @@ pub enum ProxyMessage { FromLayer(FromLayer), /// New layer instance to serve. NewLayer(NewLayer), + /// Connection to agent was dropped and need reload. + ConnectionRefresh, } #[cfg(test)] diff --git a/mirrord/intproxy/src/proxies/incoming.rs b/mirrord/intproxy/src/proxies/incoming.rs index 759574607d8..2ef25baf665 100644 --- a/mirrord/intproxy/src/proxies/incoming.rs +++ b/mirrord/intproxy/src/proxies/incoming.rs @@ -110,6 +110,7 @@ pub enum IncomingProxyMessage { AgentSteal(DaemonTcp), /// Agent responded to [`ClientMessage::SwitchProtocolVersion`]. AgentProtocolVersion(semver::Version), + ConnectionRefresh, } /// Handle for an [`Interceptor`]. @@ -521,6 +522,22 @@ impl BackgroundTask for IncomingProxy { Some(IncomingProxyMessage::AgentProtocolVersion(version)) => { self.agent_protocol_version.replace(version); } + Some(IncomingProxyMessage::ConnectionRefresh) => { + self.request_body_txs.clear(); + self.response_body_rxs.clear(); + + for (interceptor_id, _) in self.interceptors.drain() { + self.background_tasks.kill_task(interceptor_id).await; + } + + for subscription in self.subscriptions.iter_mut() { + tracing::debug!(?subscription, "resubscribing"); + + for message in subscription.resubscribe() { + message_bus.send(ProxyMessage::ToAgent(message)).await + } + } + } }, Some(task_update) = self.background_tasks.next() => match task_update { diff --git a/mirrord/intproxy/src/proxies/incoming/subscriptions.rs b/mirrord/intproxy/src/proxies/incoming/subscriptions.rs index 8439e79d8e1..f015525aeec 100644 --- a/mirrord/intproxy/src/proxies/incoming/subscriptions.rs +++ b/mirrord/intproxy/src/proxies/incoming/subscriptions.rs @@ -25,7 +25,7 @@ struct Source { /// Represents a port subscription in the agent. #[derive(Debug)] -struct Subscription { +pub struct Subscription { /// Previous sources of this subscription. Each of these was at some point active, but was /// later overwritten. queued_sources: Vec, @@ -144,6 +144,15 @@ impl Subscription { )), } } + + pub fn resubscribe(&mut self) -> Vec { + self.confirmed = false; + + std::iter::once(&self.active_source) + .chain(self.queued_sources.iter()) + .map(|source| source.request.subscription.agent_subscribe()) + .collect() + } } /// Manages port subscriptions across all connected layers. @@ -154,7 +163,7 @@ impl Subscription { /// subscription requests /// 4. Subscription response from the agent most of the time cannot be tracked down to its /// subscription request -#[derive(Default)] +#[derive(Debug, Default)] pub struct SubscriptionsManager { remote_ports: RemoteResources<(Port, SocketAddr)>, subscriptions: HashMap, @@ -298,6 +307,10 @@ impl SubscriptionsManager { pub fn layer_forked(&mut self, parent: LayerId, child: LayerId) { self.remote_ports.clone_all(parent, child); } + + pub fn iter_mut(&mut self) -> impl Iterator + '_ { + self.subscriptions.values_mut() + } } #[cfg(test)] diff --git a/mirrord/intproxy/src/remote_resources.rs b/mirrord/intproxy/src/remote_resources.rs index 055455b7818..d80679bb948 100644 --- a/mirrord/intproxy/src/remote_resources.rs +++ b/mirrord/intproxy/src/remote_resources.rs @@ -8,6 +8,7 @@ use tracing::Level; /// For tracking remote resources allocated in the agent: open files and directories, port /// subscriptions. Remote resources can be shared by multiple layer instances because of forks. +#[derive(Debug)] pub struct RemoteResources { by_layer: HashMap>, counts: HashMap, From 3db1d89f5f3cb7e969c45a7c38c9461e7ad9f3d0 Mon Sep 17 00:00:00 2001 From: Dmitry Dodzin Date: Thu, 23 Jan 2025 12:01:35 +0200 Subject: [PATCH 05/20] Cleanup not needed --- mirrord/cli/src/internal_proxy.rs | 2 +- mirrord/cli/src/main.rs | 2 +- mirrord/intproxy/src/agent_conn.rs | 44 ++++++++----------------- mirrord/operator/src/client.rs | 45 ++++++-------------------- mirrord/operator/src/client/upgrade.rs | 28 ++++------------ 5 files changed, 31 insertions(+), 90 deletions(-) diff --git a/mirrord/cli/src/internal_proxy.rs b/mirrord/cli/src/internal_proxy.rs index e17a981cb12..1c8763a0d92 100644 --- a/mirrord/cli/src/internal_proxy.rs +++ b/mirrord/cli/src/internal_proxy.rs @@ -161,7 +161,7 @@ pub(crate) async fn connect_and_ping( connect_info: Option, analytics: &mut AnalyticsReporter, ) -> CliResult { - let mut agent_conn = AgentConnection::new(config, connect_info, None, analytics) + let mut agent_conn = AgentConnection::new(config, connect_info, analytics) .await .map_err(IntProxyError::from)?; diff --git a/mirrord/cli/src/main.rs b/mirrord/cli/src/main.rs index e23836b6c43..27923cab4fd 100644 --- a/mirrord/cli/src/main.rs +++ b/mirrord/cli/src/main.rs @@ -591,7 +591,7 @@ async fn port_forward(args: &PortForwardArgs, watch: drain::Watch) -> CliResult< // errors from AgentConnection::new get mapped to CliError manually to prevent unreadably long // error print-outs - let agent_conn = AgentConnection::new(&config, Some(connection_info), None, &mut analytics) + let agent_conn = AgentConnection::new(&config, Some(connection_info), &mut analytics) .await .map_err(|agent_con_error| match agent_con_error { AgentConnectionError::Io(error) => CliError::PortForwardingSetupError(error.into()), diff --git a/mirrord/intproxy/src/agent_conn.rs b/mirrord/intproxy/src/agent_conn.rs index 2ea9a84a87c..11cb00d318e 100644 --- a/mirrord/intproxy/src/agent_conn.rs +++ b/mirrord/intproxy/src/agent_conn.rs @@ -9,7 +9,6 @@ use std::{ ops::ControlFlow, path::{Path, PathBuf}, sync::Arc, - time::Duration, }; use mirrord_analytics::{NullReporter, Reporter}; @@ -18,9 +17,7 @@ use mirrord_kube::{ api::{kubernetes::AgentKubernetesConnectInfo, wrap_raw_connection}, error::KubeApiError, }; -use mirrord_operator::client::{ - error::OperatorApiError, OperatorApi, OperatorSession, WebSocketStreamId, -}; +use mirrord_operator::client::{error::OperatorApiError, OperatorApi, OperatorSession}; use mirrord_protocol::{ClientMessage, DaemonMessage}; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -105,11 +102,10 @@ pub enum AgentConnectInfo { } #[derive(Debug, Default)] -pub enum ReconnectParams { +pub enum ReconnectFlow { ConnectInfo { config: LayerConfig, connect_info: AgentConnectInfo, - websocket_stream_id: WebSocketStreamId, }, #[default] @@ -126,7 +122,7 @@ pub enum ReconnectParams { pub struct AgentConnection { pub agent_tx: Sender, pub agent_rx: Receiver, - pub reconnect: ReconnectParams, + pub reconnect: ReconnectFlow, } impl AgentConnection { @@ -135,25 +131,19 @@ impl AgentConnection { pub async fn new( config: &LayerConfig, connect_info: Option, - websocket_stream_id: Option, analytics: &mut R, ) -> Result { let (agent_tx, agent_rx, reconnect) = match connect_info { Some(AgentConnectInfo::Operator(session)) => { - let connection = OperatorApi::connect_in_existing_session( - config, - session.clone(), - websocket_stream_id, - analytics, - ) - .await?; + let connection = + OperatorApi::connect_in_existing_session(config, session.clone(), analytics) + .await?; ( connection.tx, connection.rx, - ReconnectParams::ConnectInfo { + ReconnectFlow::ConnectInfo { config: config.clone(), connect_info: AgentConnectInfo::Operator(session), - websocket_stream_id: connection.stream_id, }, ) } @@ -187,12 +177,12 @@ impl AgentConnection { wrap_raw_connection(stream) }; - (tx, rx, ReconnectParams::default()) + (tx, rx, ReconnectFlow::default()) } Some(AgentConnectInfo::DirectKubernetes(connect_info)) => { let (tx, rx) = portforward::create_connection(config, connect_info.clone()).await?; - (tx, rx, ReconnectParams::default()) + (tx, rx, ReconnectFlow::default()) } None => { @@ -202,7 +192,7 @@ impl AgentConnection { .ok_or(AgentConnectionError::NoConnectionMethod)?; let stream = TcpStream::connect(address).await?; let (tx, rx) = wrap_raw_connection(stream); - (tx, rx, ReconnectParams::default()) + (tx, rx, ReconnectFlow::default()) } }; @@ -220,7 +210,7 @@ impl AgentConnection { Ok(Self { agent_tx, agent_rx, - reconnect: ReconnectParams::Break, + reconnect: ReconnectFlow::Break, }) } @@ -242,8 +232,6 @@ impl BackgroundTask for AgentConnection { type MessageOut = ProxyMessage; async fn run(&mut self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { - let mut sleep = Box::pin(tokio::time::sleep(Duration::from_secs(30))); - loop { tokio::select! { msg = message_bus.recv() => match msg { @@ -266,10 +254,6 @@ impl BackgroundTask for AgentConnection { } Some(msg) => message_bus.send(ProxyMessage::FromAgent(msg)).await, }, - - _ = sleep.as_mut() => { - break Err(AgentChannelError); - } } } } @@ -283,11 +267,10 @@ impl RestartableBackgroundTask for AgentConnection { message_bus: &mut MessageBus, ) -> ControlFlow { match &self.reconnect { - ReconnectParams::Break => ControlFlow::Break(error), - ReconnectParams::ConnectInfo { + ReconnectFlow::Break => ControlFlow::Break(error), + ReconnectFlow::ConnectInfo { config, connect_info, - websocket_stream_id, } => { let retry_strategy = ExponentialBackoff::from_millis(50).map(jitter).take(10); @@ -295,7 +278,6 @@ impl RestartableBackgroundTask for AgentConnection { AgentConnection::new( config, connect_info.clone().into(), - Some(*websocket_stream_id), &mut NullReporter::default(), ) .await diff --git a/mirrord/operator/src/client.rs b/mirrord/operator/src/client.rs index 25e8569437f..72e7dbdc1aa 100644 --- a/mirrord/operator/src/client.rs +++ b/mirrord/operator/src/client.rs @@ -134,8 +134,6 @@ impl fmt::Debug for OperatorSession { /// Connection to an operator target. pub struct OperatorSessionConnection { - /// Unique ID of this connection. - pub stream_id: WebSocketStreamId, /// Session of this connection. pub session: OperatorSession, /// Used to send [`ClientMessage`]s to the operator. @@ -150,7 +148,6 @@ impl fmt::Debug for OperatorSessionConnection { let rx_queued_messages = self.rx.len(); f.debug_struct("OperatorSessionConnection") - .field("stream_id", &self.stream_id) .field("session", &self.session) .field("tx_closed", &self.tx.is_closed()) .field("tx_queued_messages", &tx_queued_messages) @@ -669,15 +666,10 @@ impl OperatorApi { }; let mut connection_subtask = progress.subtask("connecting to the target"); - let (tx, rx, stream_id) = Self::connect_target(&self.client, &session, None).await?; + let (tx, rx) = Self::connect_target(&self.client, &session).await?; connection_subtask.success(Some("connected to the target")); - Ok(OperatorSessionConnection { - stream_id, - session, - tx, - rx, - }) + Ok(OperatorSessionConnection { session, tx, rx }) } /// Returns client cert's public key in a base64 encoded string (no padding same like in @@ -752,7 +744,6 @@ impl OperatorApi { pub async fn connect_in_existing_session( layer_config: &LayerConfig, session: OperatorSession, - websocket_stream_id: Option, reporter: &mut R, ) -> OperatorApiResult where @@ -778,15 +769,9 @@ impl OperatorApi { .map_err(KubeApiError::from) .map_err(OperatorApiError::CreateKubeClient)?; - let (tx, rx, stream_id) = - Self::connect_target(&client, &session, websocket_stream_id).await?; + let (tx, rx) = Self::connect_target(&client, &session).await?; - Ok(OperatorSessionConnection { - stream_id, - tx, - rx, - session, - }) + Ok(OperatorSessionConnection { tx, rx, session }) } /// Creates websocket connection to the operator target. @@ -794,33 +779,23 @@ impl OperatorApi { async fn connect_target( client: &Client, session: &OperatorSession, - websocket_stream_id: Option, - ) -> OperatorApiResult<( - Sender, - Receiver, - WebSocketStreamId, - )> { + ) -> OperatorApiResult<(Sender, Receiver)> { let request = Request::builder() .uri(&session.connect_url) .header(SESSION_ID_HEADER, session.id.to_string()) .body(vec![]) .map_err(OperatorApiError::ConnectRequestBuildError)?; - let connection = upgrade::connect_ws(client, request, websocket_stream_id.as_ref()) + let connection = upgrade::connect_ws(client, request) .await .map_err(|error| OperatorApiError::KubeError { error, operation: OperatorOperation::WebsocketConnection, })?; - tracing::debug!( - websocket_stream_id = ?connection.stream_id, - "upgraded ws" - ); - - let (tx, rx) = - ConnectionWrapper::wrap(connection.stream, session.operator_protocol_version.clone()); - - Ok((tx, rx, connection.stream_id)) + Ok(ConnectionWrapper::wrap( + connection, + session.operator_protocol_version.clone(), + )) } } diff --git a/mirrord/operator/src/client/upgrade.rs b/mirrord/operator/src/client/upgrade.rs index e8fc5a6aa8a..d144f492786 100644 --- a/mirrord/operator/src/client/upgrade.rs +++ b/mirrord/operator/src/client/upgrade.rs @@ -98,23 +98,18 @@ async fn verify_response(res: Response, key: &HeaderValue) -> Result HeaderValue { +fn sec_websocket_key_header() -> HeaderValue { + let random: [u8; 16] = rand::random(); base64::engine::general_purpose::STANDARD - .encode(webscoket_id) + .encode(random) .parse() .expect("should be valid") } -pub struct WebSocketStreamWithId { - pub stream_id: WebSocketStreamId, - pub stream: WebSocketStream, -} - pub async fn connect_ws( client: &Client, request: Request>, - websocket_stream_id: Option<&WebSocketStreamId>, -) -> kube::Result>> { +) -> kube::Result>> { let (mut parts, body) = request.into_parts(); parts.headers.insert( http::header::CONNECTION, @@ -127,8 +122,7 @@ pub async fn connect_ws( http::header::SEC_WEBSOCKET_VERSION, HeaderValue::from_static("13"), ); - let stream_id: WebSocketStreamId = rand::random(); - let key = sec_websocket_key_header(&stream_id); + let key = sec_websocket_key_header(); parts .headers .insert(http::header::SEC_WEBSOCKET_KEY, key.clone()); @@ -142,23 +136,13 @@ pub async fn connect_ws( HeaderValue::from_static(WS_PROTOCOL), ); - if let Some(reconnect_id_header) = websocket_stream_id { - parts.headers.insert( - "x-reconnect-id", - sec_websocket_key_header(reconnect_id_header), - ); - } - let res = client .send(Request::from_parts(parts, Body::from(body))) .await?; let res = verify_response(res, &key).await?; match hyper::upgrade::on(res).await { Ok(upgraded) => { - let stream = - WebSocketStream::from_raw_socket(TokioIo::new(upgraded), Role::Client, None).await; - - Ok(WebSocketStreamWithId { stream_id, stream }) + Ok(WebSocketStream::from_raw_socket(TokioIo::new(upgraded), Role::Client, None).await) } Err(e) => Err(Error::UpgradeConnection( From cf17c0c63586fcbc64ba01c7871eda0106b4680b Mon Sep 17 00:00:00 2001 From: Dmitry Dodzin Date: Thu, 23 Jan 2025 15:12:45 +0200 Subject: [PATCH 06/20] A tiny bit more cleanup --- mirrord/intproxy/src/proxies/incoming/subscriptions.rs | 2 +- mirrord/intproxy/src/remote_resources.rs | 1 - mirrord/operator/src/client.rs | 2 -- mirrord/operator/src/client/upgrade.rs | 6 ++---- 4 files changed, 3 insertions(+), 8 deletions(-) diff --git a/mirrord/intproxy/src/proxies/incoming/subscriptions.rs b/mirrord/intproxy/src/proxies/incoming/subscriptions.rs index f015525aeec..eb2e0a718c5 100644 --- a/mirrord/intproxy/src/proxies/incoming/subscriptions.rs +++ b/mirrord/intproxy/src/proxies/incoming/subscriptions.rs @@ -163,7 +163,7 @@ impl Subscription { /// subscription requests /// 4. Subscription response from the agent most of the time cannot be tracked down to its /// subscription request -#[derive(Debug, Default)] +#[derive(Default)] pub struct SubscriptionsManager { remote_ports: RemoteResources<(Port, SocketAddr)>, subscriptions: HashMap, diff --git a/mirrord/intproxy/src/remote_resources.rs b/mirrord/intproxy/src/remote_resources.rs index d80679bb948..055455b7818 100644 --- a/mirrord/intproxy/src/remote_resources.rs +++ b/mirrord/intproxy/src/remote_resources.rs @@ -8,7 +8,6 @@ use tracing::Level; /// For tracking remote resources allocated in the agent: open files and directories, port /// subscriptions. Remote resources can be shared by multiple layer instances because of forks. -#[derive(Debug)] pub struct RemoteResources { by_layer: HashMap>, counts: HashMap, diff --git a/mirrord/operator/src/client.rs b/mirrord/operator/src/client.rs index 72e7dbdc1aa..b92b3b1ea97 100644 --- a/mirrord/operator/src/client.rs +++ b/mirrord/operator/src/client.rs @@ -44,8 +44,6 @@ mod discovery; pub mod error; mod upgrade; -pub use upgrade::WebSocketStreamId; - /// State of client's [`Certificate`] the should be attached to some operator requests. pub trait ClientCertificateState: fmt::Debug {} diff --git a/mirrord/operator/src/client/upgrade.rs b/mirrord/operator/src/client/upgrade.rs index d144f492786..b75d71f12c2 100644 --- a/mirrord/operator/src/client/upgrade.rs +++ b/mirrord/operator/src/client/upgrade.rs @@ -21,8 +21,6 @@ use tokio_tungstenite::{tungstenite::protocol::Role, WebSocketStream}; const WS_PROTOCOL: &str = "v4.channel.k8s.io"; -pub type WebSocketStreamId = [u8; 16]; - // Verify upgrade response according to RFC6455. // Based on `tungstenite` and added subprotocol verification. async fn verify_response(res: Response, key: &HeaderValue) -> Result> { @@ -98,7 +96,7 @@ async fn verify_response(res: Response, key: &HeaderValue) -> Result HeaderValue { +fn sec_websocket_key() -> HeaderValue { let random: [u8; 16] = rand::random(); base64::engine::general_purpose::STANDARD .encode(random) @@ -122,7 +120,7 @@ pub async fn connect_ws( http::header::SEC_WEBSOCKET_VERSION, HeaderValue::from_static("13"), ); - let key = sec_websocket_key_header(); + let key = sec_websocket_key(); parts .headers .insert(http::header::SEC_WEBSOCKET_KEY, key.clone()); From dde8e562e94e9aff1d1cd2a717493940bfb05526 Mon Sep 17 00:00:00 2001 From: Dmitry Dodzin Date: Thu, 23 Jan 2025 16:15:50 +0200 Subject: [PATCH 07/20] Changelog --- changelog.d/+2901.fixed.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/+2901.fixed.md diff --git a/changelog.d/+2901.fixed.md b/changelog.d/+2901.fixed.md new file mode 100644 index 00000000000..b09e8b1d2ac --- /dev/null +++ b/changelog.d/+2901.fixed.md @@ -0,0 +1 @@ +Add reconnection mechanism when using mirrord operator. From 2393d06127161a7a1bc20251225ad6f041f1fa90 Mon Sep 17 00:00:00 2001 From: Dmitry Dodzin Date: Tue, 28 Jan 2025 14:01:52 +0200 Subject: [PATCH 08/20] Docs and bus -> message_bus --- mirrord/intproxy/src/background_tasks.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/mirrord/intproxy/src/background_tasks.rs b/mirrord/intproxy/src/background_tasks.rs index 315bbc7af15..f4987ea0e85 100644 --- a/mirrord/intproxy/src/background_tasks.rs +++ b/mirrord/intproxy/src/background_tasks.rs @@ -20,6 +20,7 @@ use tokio_stream::{wrappers::ReceiverStream, StreamExt, StreamMap, StreamNotifyC pub struct MessageBus { tx: Sender, rx: Receiver, + // Note if adding any new fields do look at `MessageBus::cast`'s unsafe block. } impl MessageBus { @@ -37,10 +38,13 @@ impl MessageBus { } } + /// Cast `&mut MessageBus` as `&mut MessageBus` only if they share the same message types pub fn cast(&mut self) -> &mut MessageBus where R: BackgroundTask, { + // SAFETY: since MessageBus consits of only the `Sender` and `Receiver` and both should + // match. unsafe { &mut *(self as *mut MessageBus as *mut MessageBus) } } @@ -148,9 +152,9 @@ where type MessageIn = T::MessageIn; type MessageOut = T::MessageOut; - async fn run(&mut self, bus: &mut MessageBus) -> Result<(), Self::Error> { + async fn run(&mut self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { let RestartableBackgroundTaskWrapper { task } = self; - let task_bus = bus.cast(); + let task_bus = message_bus.cast(); match task.run(task_bus).await { Err(run_error) => { From de738d4df14e23f89bea1bff5c1e81dfdb171d7f Mon Sep 17 00:00:00 2001 From: Dmitry Dodzin Date: Tue, 28 Jan 2025 14:26:41 +0200 Subject: [PATCH 09/20] Tiny --- mirrord/intproxy/src/lib.rs | 6 +++--- mirrord/intproxy/src/main_tasks.rs | 2 +- mirrord/kube/src/api/kubernetes/portforwarder.rs | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/mirrord/intproxy/src/lib.rs b/mirrord/intproxy/src/lib.rs index e75583e2ab8..8d5c1d2fa4f 100644 --- a/mirrord/intproxy/src/lib.rs +++ b/mirrord/intproxy/src/lib.rs @@ -276,7 +276,7 @@ impl IntProxy { /// Routes most messages from the agent to the correct background task. /// Some messages are handled here. - #[tracing::instrument(level = Level::TRACE, skip(self))] + #[tracing::instrument(level = Level::TRACE, skip(self), err)] async fn handle_agent_message(&mut self, message: DaemonMessage) -> Result<(), IntProxyError> { match message { DaemonMessage::Pong => self.task_txs.ping_pong.send(AgentSentPong).await, @@ -361,7 +361,7 @@ impl IntProxy { } /// Routes a message from the layer to the correct background task. - #[tracing::instrument(level = Level::TRACE, skip(self))] + #[tracing::instrument(level = Level::TRACE, skip(self), err)] async fn handle_layer_message(&self, message: FromLayer) -> Result<(), IntProxyError> { let FromLayer { message_id, @@ -410,7 +410,7 @@ impl IntProxy { Ok(()) } - #[tracing::instrument(level = Level::TRACE, skip(self))] + #[tracing::instrument(level = Level::TRACE, skip(self), err)] async fn handle_connection_refresh(&self) -> Result<(), IntProxyError> { self.task_txs .incoming diff --git a/mirrord/intproxy/src/main_tasks.rs b/mirrord/intproxy/src/main_tasks.rs index 47bec301c08..7b7810c3a9c 100644 --- a/mirrord/intproxy/src/main_tasks.rs +++ b/mirrord/intproxy/src/main_tasks.rs @@ -19,7 +19,7 @@ pub enum ProxyMessage { FromLayer(FromLayer), /// New layer instance to serve. NewLayer(NewLayer), - /// Connection to agent was dropped and need reload. + /// Connection to agent was dropped and needs reload. ConnectionRefresh, } diff --git a/mirrord/kube/src/api/kubernetes/portforwarder.rs b/mirrord/kube/src/api/kubernetes/portforwarder.rs index addc55a4358..defdbcf7313 100644 --- a/mirrord/kube/src/api/kubernetes/portforwarder.rs +++ b/mirrord/kube/src/api/kubernetes/portforwarder.rs @@ -168,13 +168,13 @@ impl SinglePortForwarder { if let Some(error) = error { tracing::warn!(?connect_info, %error, "error while performing port-forward"); } else { - tracing::warn!(?connect_info, "connection exited without error"); + tracing::warn!(?connect_info, "port-forward connection closed without any error published in relevant channel"); break; } if retry_strategy.peek().is_none() { - tracing::warn!(?connect_info, "finished retry strategy, closing connection"); + tracing::warn!(?connect_info, "port-forward connection retry strategy has reached it's limit on attempts to connect, not attempting again."); break; } From 17e5c1db3fc57d71b62185ef67761e687613dd95 Mon Sep 17 00:00:00 2001 From: Dmitry Dodzin Date: Tue, 28 Jan 2025 14:31:04 +0200 Subject: [PATCH 10/20] Ops --- changelog.d/+update-single-portforward.changed.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/+update-single-portforward.changed.md b/changelog.d/+update-single-portforward.changed.md index b0a3a984dd8..5e2abfecc3e 100644 --- a/changelog.d/+update-single-portforward.changed.md +++ b/changelog.d/+update-single-portforward.changed.md @@ -1 +1 @@ -Add a naive update to our port forward wrapper to force fist check error channel instead of ranomly picking branch on `tokio::select!` impl. +Add a naive update to our port forward wrapper to force first check error channel instead of ranomly picking branch on `tokio::select!` impl. From ff822ee74f914aedec78d292ca167d1f4c696d16 Mon Sep 17 00:00:00 2001 From: Dmitry Dodzin Date: Tue, 28 Jan 2025 14:37:33 +0200 Subject: [PATCH 11/20] Docs --- mirrord/intproxy/src/background_tasks.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/mirrord/intproxy/src/background_tasks.rs b/mirrord/intproxy/src/background_tasks.rs index f4987ea0e85..988ca56f332 100644 --- a/mirrord/intproxy/src/background_tasks.rs +++ b/mirrord/intproxy/src/background_tasks.rs @@ -137,6 +137,10 @@ pub trait RestartableBackgroundTask: BackgroundTask { ) -> impl Future> + Send; } +/// Small wrapper for `RestartableBackgroundTask` that wraps in reimplemets `BackgroundTask` with +/// logic of calling `restart` when an error is returned from `run` future. +/// +/// This is the created and used in `BackgroundTasks::register_restartable` pub struct RestartableBackgroundTaskWrapper { task: T, } From 1e729b843caddb8e5513a18255b6082f15742971 Mon Sep 17 00:00:00 2001 From: Dmitry Dodzin Date: Tue, 28 Jan 2025 16:47:26 +0200 Subject: [PATCH 12/20] Send SwitchProtocolVersion on ConnectionRefresh --- mirrord/intproxy/src/lib.rs | 5 +++++ mirrord/intproxy/src/proxies/simple.rs | 9 +++++++++ 2 files changed, 14 insertions(+) diff --git a/mirrord/intproxy/src/lib.rs b/mirrord/intproxy/src/lib.rs index 8d5c1d2fa4f..145be8752db 100644 --- a/mirrord/intproxy/src/lib.rs +++ b/mirrord/intproxy/src/lib.rs @@ -412,6 +412,11 @@ impl IntProxy { #[tracing::instrument(level = Level::TRACE, skip(self), err)] async fn handle_connection_refresh(&self) -> Result<(), IntProxyError> { + self.task_txs + .simple + .send(SimpleProxyMessage::ConnectionRefresh) + .await; + self.task_txs .incoming .send(IncomingProxyMessage::ConnectionRefresh) diff --git a/mirrord/intproxy/src/proxies/simple.rs b/mirrord/intproxy/src/proxies/simple.rs index 6efa5416865..2141d549417 100644 --- a/mirrord/intproxy/src/proxies/simple.rs +++ b/mirrord/intproxy/src/proxies/simple.rs @@ -27,6 +27,8 @@ pub enum SimpleProxyMessage { GetEnvRes(RemoteResult>), /// Protocol version was negotiated with the agent. ProtocolVersion(Version), + /// Agent connection was refreshed need to negotiate version + ConnectionRefresh, } #[derive(Error, Debug)] @@ -123,6 +125,13 @@ impl BackgroundTask for SimpleProxy { .await } SimpleProxyMessage::ProtocolVersion(version) => self.set_protocol_version(version), + SimpleProxyMessage::ConnectionRefresh => { + if let Some(version) = &self.protocol_version { + message_bus + .send(ClientMessage::SwitchProtocolVersion(version.clone())) + .await + } + } } } From 910d9b937dd2adc30f87056985e5a221d163a144 Mon Sep 17 00:00:00 2001 From: Dmitry Dodzin Date: Sun, 2 Feb 2025 16:18:18 +0200 Subject: [PATCH 13/20] Update --- mirrord/intproxy/src/background_tasks.rs | 17 +++++------------ mirrord/intproxy/src/lib.rs | 22 ++++++++++++++++++++-- mirrord/intproxy/src/proxies/files.rs | 17 +++++++++++++++++ mirrord/intproxy/src/proxies/incoming.rs | 8 +++----- mirrord/intproxy/src/proxies/simple.rs | 9 --------- mirrord/intproxy/src/remote_resources.rs | 12 ++++++++++++ 6 files changed, 57 insertions(+), 28 deletions(-) diff --git a/mirrord/intproxy/src/background_tasks.rs b/mirrord/intproxy/src/background_tasks.rs index 988ca56f332..a425bbf19a2 100644 --- a/mirrord/intproxy/src/background_tasks.rs +++ b/mirrord/intproxy/src/background_tasks.rs @@ -39,7 +39,7 @@ impl MessageBus { } /// Cast `&mut MessageBus` as `&mut MessageBus` only if they share the same message types - pub fn cast(&mut self) -> &mut MessageBus + pub(crate) fn cast(&mut self) -> &mut MessageBus where R: BackgroundTask, { @@ -267,17 +267,10 @@ where self.register(RestartableBackgroundTaskWrapper { task }, id, channel_size) } - pub fn tasks_ids(&self) -> impl Iterator { - self.handles.keys() - } - - pub async fn kill_task(&mut self, id: Id) { - self.streams.remove(&id); - let Some(task) = self.handles.remove(&id) else { - return; - }; - - task.abort(); + pub fn clear(&mut self) { + for (id, _) in self.handles.drain() { + self.streams.remove(&id); + } } /// Returns the next update from one of registered tasks. diff --git a/mirrord/intproxy/src/lib.rs b/mirrord/intproxy/src/lib.rs index 145be8752db..b904434fc46 100644 --- a/mirrord/intproxy/src/lib.rs +++ b/mirrord/intproxy/src/lib.rs @@ -18,6 +18,7 @@ use proxies::{ outgoing::{OutgoingProxy, OutgoingProxyMessage}, simple::{SimpleProxy, SimpleProxyMessage}, }; +use semver::Version; use tokio::{net::TcpListener, time}; use tracing::Level; @@ -60,6 +61,10 @@ pub struct IntProxy { any_connection_accepted: bool, background_tasks: BackgroundTasks, task_txs: TaskTxs, + /// [`mirrord_protocol`] version negotiated with the agent. + /// Determines whether we can use some messages, like [`FileRequest::ReadDirBatch`] or + /// [`FileRequest::ReadLink`]. + protocol_version: Option, } impl IntProxy { @@ -128,6 +133,7 @@ impl IntProxy { ping_pong, files, }, + protocol_version: None, } } @@ -318,6 +324,8 @@ impl IntProxy { .await } DaemonMessage::SwitchProtocolVersionResponse(protocol_version) => { + let _ = self.protocol_version.insert(protocol_version.clone()); + if CLIENT_READY_FOR_LOGS.matches(&protocol_version) { self.task_txs.agent.send(ClientMessage::ReadyForLogs).await; } @@ -413,8 +421,18 @@ impl IntProxy { #[tracing::instrument(level = Level::TRACE, skip(self), err)] async fn handle_connection_refresh(&self) -> Result<(), IntProxyError> { self.task_txs - .simple - .send(SimpleProxyMessage::ConnectionRefresh) + .agent + .send(ClientMessage::SwitchProtocolVersion( + self.protocol_version + .as_ref() + .unwrap_or(&mirrord_protocol::VERSION) + .clone(), + )) + .await; + + self.task_txs + .files + .send(FilesProxyMessage::ConnectionRefresh) .await; self.task_txs diff --git a/mirrord/intproxy/src/proxies/files.rs b/mirrord/intproxy/src/proxies/files.rs index 0d24d9d41c5..60da47e3eca 100644 --- a/mirrord/intproxy/src/proxies/files.rs +++ b/mirrord/intproxy/src/proxies/files.rs @@ -36,6 +36,8 @@ pub enum FilesProxyMessage { LayerForked(LayerForked), /// Layer instance closed. LayerClosed(LayerClosed), + /// Agent connection was refreshed + ConnectionRefresh, } /// Error that can occur in [`FilesProxy`]. @@ -761,6 +763,20 @@ impl FilesProxy { Ok(()) } + + async fn handle_reconnect(&mut self, _message_bus: &mut MessageBus) { + for (_, fds) in self.remote_files.drain() { + for fd in fds { + self.buffered_files.remove(&fd); + } + } + + for (_, fds) in self.remote_dirs.drain() { + for fd in fds { + self.buffered_dirs.remove(&fd); + } + } + } } impl BackgroundTask for FilesProxy { @@ -785,6 +801,7 @@ impl BackgroundTask for FilesProxy { } FilesProxyMessage::LayerForked(forked) => self.layer_forked(forked), FilesProxyMessage::ProtocolVersion(version) => self.protocol_version(version), + FilesProxyMessage::ConnectionRefresh => self.handle_reconnect(message_bus).await, } } diff --git a/mirrord/intproxy/src/proxies/incoming.rs b/mirrord/intproxy/src/proxies/incoming.rs index 3c57b630b19..f16112e1b0f 100644 --- a/mirrord/intproxy/src/proxies/incoming.rs +++ b/mirrord/intproxy/src/proxies/incoming.rs @@ -536,11 +536,9 @@ impl IncomingProxy { } IncomingProxyMessage::ConnectionRefresh => { - let running_task_ids = self.tasks.tasks_ids().cloned().collect::>(); - - for task in running_task_ids { - self.tasks.kill_task(task).await; - } + self.mirror_tcp_proxies.clear(); + self.steal_tcp_proxies.clear(); + self.tasks.clear(); for subscription in self.subscriptions.iter_mut() { tracing::debug!(?subscription, "resubscribing"); diff --git a/mirrord/intproxy/src/proxies/simple.rs b/mirrord/intproxy/src/proxies/simple.rs index 2141d549417..6efa5416865 100644 --- a/mirrord/intproxy/src/proxies/simple.rs +++ b/mirrord/intproxy/src/proxies/simple.rs @@ -27,8 +27,6 @@ pub enum SimpleProxyMessage { GetEnvRes(RemoteResult>), /// Protocol version was negotiated with the agent. ProtocolVersion(Version), - /// Agent connection was refreshed need to negotiate version - ConnectionRefresh, } #[derive(Error, Debug)] @@ -125,13 +123,6 @@ impl BackgroundTask for SimpleProxy { .await } SimpleProxyMessage::ProtocolVersion(version) => self.set_protocol_version(version), - SimpleProxyMessage::ConnectionRefresh => { - if let Some(version) = &self.protocol_version { - message_bus - .send(ClientMessage::SwitchProtocolVersion(version.clone())) - .await - } - } } } diff --git a/mirrord/intproxy/src/remote_resources.rs b/mirrord/intproxy/src/remote_resources.rs index 055455b7818..645ca3fece2 100644 --- a/mirrord/intproxy/src/remote_resources.rs +++ b/mirrord/intproxy/src/remote_resources.rs @@ -123,4 +123,16 @@ where *self.counts.entry(resource).or_default() += 1; } } + + /// Removes all resources held all layers instances. + /// Returns an [`Iterator`] of layers and remote files/folders that were removed. + /// + /// Should be used for when the remote is lost and there is a need to restart. + #[tracing::instrument(level = Level::TRACE, skip(self))] + pub(crate) fn drain(&mut self) -> impl '_ + Iterator)> { + let ids: Vec<_> = self.by_layer.keys().cloned().collect(); + + ids.into_iter() + .map(|id| (id, self.remove_all(id).collect())) + } } From 8c379a4fbecd7671d2b919dc558c3eb32abd1958 Mon Sep 17 00:00:00 2001 From: Dmitry Dodzin Date: Sun, 2 Feb 2025 16:43:50 +0200 Subject: [PATCH 14/20] Update mirrord/kube/src/api/kubernetes/portforwarder.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: MichaƂ Smolarek <34063647+Razz4780@users.noreply.github.com> --- mirrord/kube/src/api/kubernetes/portforwarder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mirrord/kube/src/api/kubernetes/portforwarder.rs b/mirrord/kube/src/api/kubernetes/portforwarder.rs index defdbcf7313..893e4ee82da 100644 --- a/mirrord/kube/src/api/kubernetes/portforwarder.rs +++ b/mirrord/kube/src/api/kubernetes/portforwarder.rs @@ -215,7 +215,7 @@ pub async fn retry_portforward( client: &Client, connect_info: AgentKubernetesConnectInfo, ) -> Result<(Box, SinglePortForwarder)> { - // use 1024 * 1024 to be identical to `kube` implementation (https://github.com/kube-rs/kube/blob/ecbdafc214538aadc78ec8447f2fa12d0057492b/kube-client/src/api/portforward.rs#L101) + // use 1024 * 1024 to be identical to [`kube` implementation](https://github.com/kube-rs/kube/blob/ecbdafc214538aadc78ec8447f2fa12d0057492b/kube-client/src/api/portforward.rs#L101). let (lhs, rhs) = tokio::io::duplex(1024 * 1024); let port_forwarder = SinglePortForwarder::connect(client, connect_info, Box::new(rhs)).await?; From 48d4f7f37e4b1051cf3ffb38dba5492551b1edbf Mon Sep 17 00:00:00 2001 From: Dmitry Dodzin Date: Sun, 2 Feb 2025 18:07:00 +0200 Subject: [PATCH 15/20] Resubscribe only active_source --- mirrord/intproxy/src/proxies/incoming.rs | 6 +++--- mirrord/intproxy/src/proxies/incoming/subscriptions.rs | 7 ++----- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/mirrord/intproxy/src/proxies/incoming.rs b/mirrord/intproxy/src/proxies/incoming.rs index f16112e1b0f..d0700afefe7 100644 --- a/mirrord/intproxy/src/proxies/incoming.rs +++ b/mirrord/intproxy/src/proxies/incoming.rs @@ -543,9 +543,9 @@ impl IncomingProxy { for subscription in self.subscriptions.iter_mut() { tracing::debug!(?subscription, "resubscribing"); - for message in subscription.resubscribe() { - message_bus.send(ProxyMessage::ToAgent(message)).await - } + message_bus + .send(ProxyMessage::ToAgent(subscription.resubscribe_message())) + .await } } } diff --git a/mirrord/intproxy/src/proxies/incoming/subscriptions.rs b/mirrord/intproxy/src/proxies/incoming/subscriptions.rs index be9dcbdb315..5f41ce0ca1a 100644 --- a/mirrord/intproxy/src/proxies/incoming/subscriptions.rs +++ b/mirrord/intproxy/src/proxies/incoming/subscriptions.rs @@ -145,13 +145,10 @@ impl Subscription { } } - pub fn resubscribe(&mut self) -> Vec { + pub fn resubscribe_message(&mut self) -> ClientMessage { self.confirmed = false; - std::iter::once(&self.active_source) - .chain(self.queued_sources.iter()) - .map(|source| source.request.subscription.agent_subscribe()) - .collect() + self.active_source.request.subscription.agent_subscribe() } } From d79e53cb25ebdb3bc91b0084fa25db797328efb7 Mon Sep 17 00:00:00 2001 From: Dmitry Dodzin Date: Sun, 2 Feb 2025 18:30:17 +0200 Subject: [PATCH 16/20] Doc fix --- mirrord/intproxy/src/lib.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/mirrord/intproxy/src/lib.rs b/mirrord/intproxy/src/lib.rs index b904434fc46..b639e0aeacb 100644 --- a/mirrord/intproxy/src/lib.rs +++ b/mirrord/intproxy/src/lib.rs @@ -61,9 +61,8 @@ pub struct IntProxy { any_connection_accepted: bool, background_tasks: BackgroundTasks, task_txs: TaskTxs, + /// [`mirrord_protocol`] version negotiated with the agent. - /// Determines whether we can use some messages, like [`FileRequest::ReadDirBatch`] or - /// [`FileRequest::ReadLink`]. protocol_version: Option, } From abe8709f325263ed2b4a88f8068b85fb1c91be70 Mon Sep 17 00:00:00 2001 From: Dmitry Dodzin Date: Mon, 3 Feb 2025 14:43:43 +0200 Subject: [PATCH 17/20] Initial update to fs --- mirrord/intproxy/src/proxies/files.rs | 300 +++++++++++++++++++++++++- 1 file changed, 289 insertions(+), 11 deletions(-) diff --git a/mirrord/intproxy/src/proxies/files.rs b/mirrord/intproxy/src/proxies/files.rs index 60da47e3eca..f195a87e694 100644 --- a/mirrord/intproxy/src/proxies/files.rs +++ b/mirrord/intproxy/src/proxies/files.rs @@ -1,15 +1,13 @@ use core::fmt; -use std::{collections::HashMap, vec}; +use std::{ + collections::{HashMap, VecDeque}, + vec, +}; use mirrord_intproxy_protocol::{LayerId, MessageId, ProxyToLayerMessage}; use mirrord_protocol::{ - file::{ - CloseDirRequest, CloseFileRequest, DirEntryInternal, ReadDirBatchRequest, ReadDirResponse, - ReadFileResponse, ReadLimitedFileRequest, SeekFromInternal, MKDIR_VERSION, - READDIR_BATCH_VERSION, READLINK_VERSION, RMDIR_VERSION, STATFS_VERSION, - }, - ClientMessage, DaemonMessage, ErrorKindInternal, FileRequest, FileResponse, RemoteIOError, - ResponseError, + file::*, ClientMessage, DaemonMessage, ErrorKindInternal, FileRequest, FileResponse, + RemoteIOError, ResponseError, }; use semver::Version; use thiserror::Error; @@ -23,6 +21,107 @@ use crate::{ request_queue::RequestQueue, }; +fn agent_lost_io_error() -> ResponseError { + ResponseError::RemoteIO(RemoteIOError { + raw_os_error: None, + kind: ErrorKindInternal::Unknown("connection with mirrord-agent was lost".to_string()), + }) +} + +macro_rules! dummy_file_response { + ($name: ident) => { + FileResponse::$name(Err(ResponseError::NotImplemented)) + }; +} + +/// Lightweight (no allocations) [`FileResponse`] to be returned when connection with the +/// mirrord-agent is lost. Must be converted into real [`FileResponse`] via [`From`]. +pub struct AgentLostFileResponse(LayerId, MessageId, FileResponse); + +impl From for ToLayer { + fn from(value: AgentLostFileResponse) -> Self { + let AgentLostFileResponse(layer_id, message_id, response) = value; + let error = agent_lost_io_error(); + + let real_response = match response { + FileResponse::Access(..) => FileResponse::Access(Err(error)), + FileResponse::GetDEnts64(..) => FileResponse::GetDEnts64(Err(error)), + FileResponse::Open(..) => FileResponse::Open(Err(error)), + FileResponse::OpenDir(..) => FileResponse::OpenDir(Err(error)), + FileResponse::Read(..) => FileResponse::Read(Err(error)), + FileResponse::ReadDir(..) => FileResponse::ReadDir(Err(error)), + FileResponse::ReadDirBatch(..) => FileResponse::ReadDirBatch(Err(error)), + FileResponse::ReadLimited(..) => FileResponse::ReadLimited(Err(error)), + FileResponse::Seek(..) => FileResponse::Seek(Err(error)), + FileResponse::Write(..) => FileResponse::Write(Err(error)), + FileResponse::WriteLimited(..) => FileResponse::WriteLimited(Err(error)), + FileResponse::Xstat(..) => FileResponse::Xstat(Err(error)), + FileResponse::XstatFs(..) => FileResponse::XstatFs(Err(error)), + FileResponse::ReadLink(..) => FileResponse::ReadLink(Err(error)), + FileResponse::MakeDir(..) => FileResponse::MakeDir(Err(error)), + FileResponse::RemoveDir(..) => FileResponse::RemoveDir(Err(error)), + FileResponse::Unlink(..) => FileResponse::Unlink(Err(error)), + }; + + debug_assert_eq!( + std::mem::discriminant(&response), + std::mem::discriminant(&real_response), + ); + + ToLayer { + layer_id, + message_id, + message: ProxyToLayerMessage::File(real_response), + } + } +} + +/// Convenience trait for [`FileRequest`]. +trait FileRequestExt: Sized { + /// If this [`FileRequest`] requires a [`FileResponse`] from the agent, return corresponding + /// [`AgentLostFileResponse`]. + fn agent_lost_response( + &self, + layer_id: LayerId, + message_id: MessageId, + ) -> Option; +} + +impl FileRequestExt for FileRequest { + fn agent_lost_response( + &self, + layer_id: LayerId, + message_id: MessageId, + ) -> Option { + let response = match self { + Self::Close(..) | Self::CloseDir(..) => return None, + Self::Access(..) => dummy_file_response!(Access), + Self::FdOpenDir(..) => dummy_file_response!(OpenDir), + Self::GetDEnts64(..) => dummy_file_response!(GetDEnts64), + Self::Open(..) => dummy_file_response!(Open), + Self::OpenRelative(..) => dummy_file_response!(Open), + Self::Read(..) => dummy_file_response!(Read), + Self::ReadDir(..) => dummy_file_response!(ReadDir), + Self::ReadDirBatch(..) => dummy_file_response!(ReadDirBatch), + Self::ReadLimited(..) => dummy_file_response!(ReadLimited), + Self::Seek(..) => dummy_file_response!(Seek), + Self::Write(..) => dummy_file_response!(Write), + Self::WriteLimited(..) => dummy_file_response!(WriteLimited), + Self::Xstat(..) => dummy_file_response!(Xstat), + Self::XstatFs(..) => dummy_file_response!(XstatFs), + Self::ReadLink(..) => dummy_file_response!(ReadLink), + Self::MakeDir(..) => dummy_file_response!(MakeDir), + Self::MakeDirAt(..) => dummy_file_response!(MakeDir), + Self::Unlink(..) => dummy_file_response!(Unlink), + Self::UnlinkAt(..) => dummy_file_response!(Unlink), + Self::RemoveDir(..) => dummy_file_response!(RemoveDir), + Self::StatFs(..) => dummy_file_response!(XstatFs), + }; + + Some(AgentLostFileResponse(layer_id, message_id, response)) + } +} + /// Messages handled by [`FilesProxy`]. #[derive(Debug)] pub enum FilesProxyMessage { @@ -125,6 +224,164 @@ enum AdditionalRequestData { Other, } +/// Manages state of file operations. Remaps remote file descriptors and returns early +/// [`ResponseError`]s for [`FileRequest`]s related to invalidated (agent lost) descriptors. +/// Tracks state of outstanding [`FileRequest`]s to respond with errors in case the agent is lost. +#[derive(Default)] +pub struct RouterFileOps { + /// Highest file fd we've returned to the client (after remapping). + highest_user_facing_fd: Option, + /// Offset we need to add to every fd we receive from the mirrord-agent. + /// All lesser fds received from the clients are invalid (probably lost with previous + /// mirrord-agent responsible for file ops). + current_fd_offset: u64, + /// Prepared error responses to outstanding [`FileRequest`]s. + /// We must flush these when connection to the mirrord-agent is lost, otherwise the layer will + /// hang. + queued_error_responses: VecDeque, +} + +impl fmt::Debug for RouterFileOps { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RouterFileOps") + .field("highest_user_facing_fd", &self.highest_user_facing_fd) + .field("current_fd_offset", &self.current_fd_offset) + .finish() + } +} + +impl RouterFileOps { + /// Return a request to be sent to the agent ([`Ok`] variant) or + /// a response to be sent to the user ([`Err`] variant). + #[tracing::instrument(level = Level::TRACE, ret, err(level = Level::TRACE, Debug))] + pub fn map_request( + &mut self, + layer_id: LayerId, + message_id: MessageId, + mut request: FileRequest, + ) -> Result, ToLayer> { + match &mut request { + // These requests do not refer to any open remote fd. + // It's safe to pass them as they are. + FileRequest::Open(..) + | FileRequest::Access(..) + | FileRequest::Xstat(XstatRequest { fd: None, .. }) + | FileRequest::ReadLink(..) + | FileRequest::MakeDir(..) + | FileRequest::Unlink(..) + | FileRequest::RemoveDir(..) + | FileRequest::StatFs(..) + | FileRequest::UnlinkAt(UnlinkAtRequest { dirfd: None, .. }) => {} + + // These requests do not require any response from the agent. + // We need to remap the fd, but if the fd is invalid we simply drop them. + FileRequest::Close(CloseFileRequest { fd: remote_fd }) + | FileRequest::CloseDir(CloseDirRequest { remote_fd }) => { + if *remote_fd < self.current_fd_offset { + return Ok(None); + } + + *remote_fd -= self.current_fd_offset; + } + + // These requests refer to an open remote fd and require a response from the agent. + // We need to remap the fd and respond with an error if the fd is invalid. + FileRequest::FdOpenDir(FdOpenDirRequest { remote_fd }) + | FileRequest::GetDEnts64(GetDEnts64Request { remote_fd, .. }) + | FileRequest::OpenRelative(OpenRelativeFileRequest { + relative_fd: remote_fd, + .. + }) + | FileRequest::Read(ReadFileRequest { remote_fd, .. }) + | FileRequest::ReadDir(ReadDirRequest { remote_fd, .. }) + | FileRequest::ReadDirBatch(ReadDirBatchRequest { remote_fd, .. }) + | FileRequest::ReadLimited(ReadLimitedFileRequest { remote_fd, .. }) + | FileRequest::Seek(SeekFileRequest { fd: remote_fd, .. }) + | FileRequest::Write(WriteFileRequest { fd: remote_fd, .. }) + | FileRequest::WriteLimited(WriteLimitedFileRequest { remote_fd, .. }) + | FileRequest::Xstat(XstatRequest { + fd: Some(remote_fd), + .. + }) + | FileRequest::XstatFs(XstatFsRequest { fd: remote_fd }) + | FileRequest::MakeDirAt(MakeDirAtRequest { + dirfd: remote_fd, .. + }) + | FileRequest::UnlinkAt(UnlinkAtRequest { + dirfd: Some(remote_fd), + .. + }) => { + if *remote_fd < self.current_fd_offset { + let error_response = request + .agent_lost_response(layer_id, message_id) + .expect("these requests require responses") + .into(); + return Err(error_response); + } + + *remote_fd -= self.current_fd_offset; + } + }; + + if let Some(response) = request.agent_lost_response(layer_id, message_id) { + self.queued_error_responses.push_back(response); + } + + Ok(Some(request)) + } + + /// Return a response to be sent to the client. + #[tracing::instrument(level = Level::TRACE, ret)] + pub fn map_response(&mut self, mut response: FileResponse) -> FileResponse { + match &mut response { + // These responses do not refer to any open remote fd. + FileResponse::Access(..) + | FileResponse::Read(..) + | FileResponse::ReadLimited(..) + | FileResponse::ReadDir(..) + | FileResponse::Seek(..) + | FileResponse::Write(..) + | FileResponse::WriteLimited(..) + | FileResponse::Xstat(..) + | FileResponse::XstatFs(..) + | FileResponse::GetDEnts64(Err(..)) + | FileResponse::Open(Err(..)) + | FileResponse::OpenDir(Err(..)) + | FileResponse::ReadDirBatch(Err(..)) + | FileResponse::ReadLink(..) + | FileResponse::MakeDir(..) + | FileResponse::Unlink(..) + | FileResponse::RemoveDir(..) => {} + + FileResponse::GetDEnts64(Ok(GetDEnts64Response { fd: remote_fd, .. })) + | FileResponse::Open(Ok(OpenFileResponse { fd: remote_fd })) + | FileResponse::OpenDir(Ok(OpenDirResponse { fd: remote_fd, .. })) + | FileResponse::ReadDirBatch(Ok(ReadDirBatchResponse { fd: remote_fd, .. })) => { + *remote_fd += self.current_fd_offset; + + self.highest_user_facing_fd = + std::cmp::max(self.highest_user_facing_fd, Some(*remote_fd)); + } + } + + self.queued_error_responses.pop_front(); + + response + } + + /// Notify this manager that the agent was lost. + /// Return messages to be sent to the user. + #[tracing::instrument(level = Level::TRACE)] + pub fn agent_lost(&mut self) -> impl Iterator { + self.current_fd_offset = self.highest_user_facing_fd.map(|fd| fd + 1).unwrap_or(0); + + std::mem::take(&mut self.queued_error_responses) + .into_iter() + .map(ToLayer::from) + .map(ProxyMessage::ToLayer) + } +} + /// For handling all file operations. /// Run as a [`BackgroundTask`]. /// @@ -171,6 +428,8 @@ pub struct FilesProxy { remote_dirs: RemoteResources, /// Locally stored data of buffered directories. buffered_dirs: HashMap, + + reconnect_tracker: RouterFileOps, } impl fmt::Debug for FilesProxy { @@ -182,6 +441,7 @@ impl fmt::Debug for FilesProxy { .field("buffered_dirs", &self.buffered_dirs) .field("protocol_version", &self.protocol_version) .field("request_queue", &self.request_queue) + .field("reconnect_tracker", &self.reconnect_tracker) .finish() } } @@ -208,6 +468,8 @@ impl FilesProxy { remote_dirs: Default::default(), buffered_dirs: Default::default(), + + reconnect_tracker: Default::default(), } } @@ -764,7 +1026,7 @@ impl FilesProxy { Ok(()) } - async fn handle_reconnect(&mut self, _message_bus: &mut MessageBus) { + async fn handle_reconnect(&mut self, message_bus: &mut MessageBus) { for (_, fds) in self.remote_files.drain() { for fd in fds { self.buffered_files.remove(&fd); @@ -776,6 +1038,10 @@ impl FilesProxy { self.buffered_dirs.remove(&fd); } } + + for response in self.reconnect_tracker.agent_lost() { + message_bus.send(response).await; + } } } @@ -790,10 +1056,22 @@ impl BackgroundTask for FilesProxy { match message { FilesProxyMessage::FileReq(message_id, layer_id, request) => { - self.file_request(request, layer_id, message_id, message_bus) - .await; + match self + .reconnect_tracker + .map_request(layer_id, message_id, request) + { + Ok(None) => {} + Err(response) => { + message_bus.send(response).await; + } + Ok(Some(request)) => { + self.file_request(request, layer_id, message_id, message_bus) + .await + } + }; } FilesProxyMessage::FileRes(response) => { + let response = self.reconnect_tracker.map_response(response); self.file_response(response, message_bus).await?; } FilesProxyMessage::LayerClosed(closed) => { From ffdc1b018a2785694a81c0a1eaee67768481dedb Mon Sep 17 00:00:00 2001 From: Dmitry Dodzin Date: Mon, 3 Feb 2025 18:17:06 +0200 Subject: [PATCH 18/20] Remove unsafe --- mirrord/intproxy/src/background_tasks.rs | 45 +++++++++++------------- 1 file changed, 20 insertions(+), 25 deletions(-) diff --git a/mirrord/intproxy/src/background_tasks.rs b/mirrord/intproxy/src/background_tasks.rs index a425bbf19a2..ee4a7d8a24c 100644 --- a/mirrord/intproxy/src/background_tasks.rs +++ b/mirrord/intproxy/src/background_tasks.rs @@ -15,41 +15,34 @@ use tokio::{ }; use tokio_stream::{wrappers::ReceiverStream, StreamExt, StreamMap, StreamNotifyClose}; +pub type MessageBus = + MessageBusInner<::MessageIn, ::MessageOut>; + /// A struct that is meant to be the only way the [`BackgroundTask`]s can communicate with their /// parents. It allows the tasks to send and receive messages. -pub struct MessageBus { - tx: Sender, - rx: Receiver, +pub struct MessageBusInner { + tx: Sender, + rx: Receiver, // Note if adding any new fields do look at `MessageBus::cast`'s unsafe block. } -impl MessageBus { +impl MessageBusInner { /// Attempts to send a message to this task's parent. - pub async fn send>(&self, msg: M) { + pub async fn send>(&self, msg: M) { let _ = self.tx.send(msg.into()).await; } /// Receives a message from this task's parent. /// [`None`] means that the channel is closed and there will be no more messages. - pub async fn recv(&mut self) -> Option { + pub async fn recv(&mut self) -> Option { tokio::select! { _ = self.tx.closed() => None, msg = self.rx.recv() => msg, } } - /// Cast `&mut MessageBus` as `&mut MessageBus` only if they share the same message types - pub(crate) fn cast(&mut self) -> &mut MessageBus - where - R: BackgroundTask, - { - // SAFETY: since MessageBus consits of only the `Sender` and `Receiver` and both should - // match. - unsafe { &mut *(self as *mut MessageBus as *mut MessageBus) } - } - /// Returns a [`Closed`] instance for this [`MessageBus`]. - pub(crate) fn closed(&self) -> Closed { + pub(crate) fn closed(&self) -> Closed { Closed(self.tx.clone()) } } @@ -93,9 +86,9 @@ impl MessageBus { /// } /// } /// ``` -pub(crate) struct Closed(Sender); +pub(crate) struct Closed(Sender); -impl Closed { +impl Closed { /// Resolves the given [`Future`], unless the origin [`MessageBus`] closes first. /// /// # Returns @@ -158,20 +151,22 @@ where async fn run(&mut self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { let RestartableBackgroundTaskWrapper { task } = self; - let task_bus = message_bus.cast(); - match task.run(task_bus).await { + match task.run(message_bus).await { Err(run_error) => { let mut run_error = Some(run_error); loop { match task - .restart(run_error.take().expect("should contain an error"), task_bus) + .restart( + run_error.take().expect("should contain an error"), + message_bus, + ) .await { ControlFlow::Break(err) => return Err(err), ControlFlow::Continue(()) => { - if let Err(err) = task.run(task_bus).await { + if let Err(err) = task.run(message_bus).await { run_error = Some(err); } else { return Ok(()); @@ -206,7 +201,7 @@ impl BackgroundTasks where Id: fmt::Debug + Hash + PartialEq + Eq + Clone + Unpin, Err: 'static + Send, - MOut: Send + Unpin, + MOut: 'static + Send + Unpin, { /// Registers a new background task in this struct. Returns a [`TaskSender`] that can be used to /// send messages to the task. Dropping this sender will close the channel of messages @@ -239,7 +234,7 @@ where StreamNotifyClose::new(ReceiverStream::new(out_msg_rx)), ); - let mut message_bus = MessageBus { + let mut message_bus = MessageBus:: { tx: out_msg_tx, rx: in_msg_rx, }; From 2e2ccd72e38737fd0e3982fbb0ecfc45f3a11e13 Mon Sep 17 00:00:00 2001 From: Dmitry Dodzin Date: Mon, 3 Feb 2025 18:56:22 +0200 Subject: [PATCH 19/20] Add Version Check --- mirrord/intproxy/src/agent_conn.rs | 19 +++++++++++++++---- mirrord/operator/src/client.rs | 5 +++++ 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/mirrord/intproxy/src/agent_conn.rs b/mirrord/intproxy/src/agent_conn.rs index 11cb00d318e..7e448f8e5ff 100644 --- a/mirrord/intproxy/src/agent_conn.rs +++ b/mirrord/intproxy/src/agent_conn.rs @@ -8,7 +8,7 @@ use std::{ net::{IpAddr, SocketAddr}, ops::ControlFlow, path::{Path, PathBuf}, - sync::Arc, + sync::{Arc, LazyLock}, }; use mirrord_analytics::{NullReporter, Reporter}; @@ -19,6 +19,7 @@ use mirrord_kube::{ }; use mirrord_operator::client::{error::OperatorApiError, OperatorApi, OperatorSession}; use mirrord_protocol::{ClientMessage, DaemonMessage}; +use semver::VersionReq; use serde::{Deserialize, Serialize}; use thiserror::Error; use tokio::{ @@ -42,6 +43,12 @@ use crate::{ mod portforward; +static OPERATOR_RETRY_VERSION: LazyLock = LazyLock::new(|| { + ">3.104.2" + .parse() + .expect("OPERATOR_RETRY_VERSION should be a valid VersionReq value") +}); + /// Errors that can occur when the internal proxy tries to establish a connection with the agent. #[derive(Error, Debug)] pub enum AgentConnectionError { @@ -141,9 +148,13 @@ impl AgentConnection { ( connection.tx, connection.rx, - ReconnectFlow::ConnectInfo { - config: config.clone(), - connect_info: AgentConnectInfo::Operator(session), + if OPERATOR_RETRY_VERSION.matches(&session.operator_version) { + ReconnectFlow::ConnectInfo { + config: config.clone(), + connect_info: AgentConnectInfo::Operator(session), + } + } else { + ReconnectFlow::default() }, ) } diff --git a/mirrord/operator/src/client.rs b/mirrord/operator/src/client.rs index 26f3d66e5b1..8144c99a988 100644 --- a/mirrord/operator/src/client.rs +++ b/mirrord/operator/src/client.rs @@ -113,6 +113,9 @@ pub struct OperatorSession { /// Version of [`mirrord_protocol`] used by the operator. /// Used to create [`ConnectionWrapper`]. pub operator_protocol_version: Option, + + /// Version of the operator. + pub operator_version: Version, } impl fmt::Debug for OperatorSession { @@ -126,6 +129,7 @@ impl fmt::Debug for OperatorSession { &self.operator_license_fingerprint, ) .field("operator_protocol_version", &self.operator_protocol_version) + .field("operator_version", &self.operator_version) .finish() } } @@ -662,6 +666,7 @@ impl OperatorApi { .protocol_version .as_ref() .and_then(|version| version.parse().ok()), + operator_version: self.operator.spec.operator_version.clone(), }; let mut connection_subtask = progress.subtask("connecting to the target"); From 32998d3af266d2659ce5ac26097cc62d91ea3106 Mon Sep 17 00:00:00 2001 From: Dmitry Dodzin Date: Mon, 3 Feb 2025 20:40:12 +0200 Subject: [PATCH 20/20] Update reconnect to be sort of stateful with message queue --- mirrord/intproxy/src/agent_conn.rs | 10 +++- mirrord/intproxy/src/lib.rs | 88 +++++++++++++++++++++++------- mirrord/intproxy/src/main_tasks.rs | 10 +++- 3 files changed, 84 insertions(+), 24 deletions(-) diff --git a/mirrord/intproxy/src/agent_conn.rs b/mirrord/intproxy/src/agent_conn.rs index 7e448f8e5ff..99621a23a20 100644 --- a/mirrord/intproxy/src/agent_conn.rs +++ b/mirrord/intproxy/src/agent_conn.rs @@ -38,7 +38,7 @@ use tracing::Level; use crate::{ background_tasks::{BackgroundTask, MessageBus, RestartableBackgroundTask}, - ProxyMessage, + main_tasks::{ConnectionRefresh, ProxyMessage}, }; mod portforward; @@ -283,6 +283,10 @@ impl RestartableBackgroundTask for AgentConnection { config, connect_info, } => { + message_bus + .send(ProxyMessage::ConnectionRefresh(ConnectionRefresh::Start)) + .await; + let retry_strategy = ExponentialBackoff::from_millis(50).map(jitter).take(10); let connection = Retry::spawn(retry_strategy, || async move { @@ -301,7 +305,9 @@ impl RestartableBackgroundTask for AgentConnection { match connection { Ok(connection) => { *self = connection; - message_bus.send(ProxyMessage::ConnectionRefresh).await; + message_bus + .send(ProxyMessage::ConnectionRefresh(ConnectionRefresh::End)) + .await; ControlFlow::Continue(()) } diff --git a/mirrord/intproxy/src/lib.rs b/mirrord/intproxy/src/lib.rs index b639e0aeacb..1162db0ccbb 100644 --- a/mirrord/intproxy/src/lib.rs +++ b/mirrord/intproxy/src/lib.rs @@ -2,7 +2,10 @@ #![warn(clippy::indexing_slicing)] #![deny(unused_crate_dependencies)] -use std::{collections::HashMap, time::Duration}; +use std::{ + collections::{HashMap, VecDeque}, + time::Duration, +}; use background_tasks::{BackgroundTasks, TaskSender, TaskUpdate}; use error::UnexpectedAgentMessage; @@ -26,7 +29,7 @@ use crate::{ agent_conn::AgentConnection, background_tasks::{RestartableBackgroundTaskWrapper, TaskError}, error::IntProxyError, - main_tasks::LayerClosed, + main_tasks::{ConnectionRefresh, LayerClosed}, }; pub mod agent_conn; @@ -64,6 +67,10 @@ pub struct IntProxy { /// [`mirrord_protocol`] version negotiated with the agent. protocol_version: Option, + + /// Temporary message queue for any [`ProxyMessage`] from layer or to agent that are sent + /// during reconnection state. + reconnect_task_queue: Option>, } impl IntProxy { @@ -133,6 +140,7 @@ impl IntProxy { files, }, protocol_version: None, + reconnect_task_queue: Default::default(), } } @@ -186,6 +194,15 @@ impl IntProxy { /// [`ProxyMessage::NewLayer`] is handled here, as an exception. async fn handle(&mut self, msg: ProxyMessage) -> Result<(), IntProxyError> { match msg { + ProxyMessage::NewLayer(_) | ProxyMessage::FromLayer(_) | ProxyMessage::ToAgent(_) + if self.reconnect_task_queue.is_some() => + { + // We are in reconnect state so should queue this message. + self.reconnect_task_queue + .as_mut() + .expect("reconnect_task_queue should contain value when in reconnect state") + .push_back(msg); + } ProxyMessage::NewLayer(new_layer) => { self.any_connection_accepted = true; @@ -230,7 +247,7 @@ impl IntProxy { .await; } } - ProxyMessage::ConnectionRefresh => self.handle_connection_refresh().await?, + ProxyMessage::ConnectionRefresh(kind) => self.handle_connection_refresh(kind).await?, } Ok(()) @@ -418,26 +435,55 @@ impl IntProxy { } #[tracing::instrument(level = Level::TRACE, skip(self), err)] - async fn handle_connection_refresh(&self) -> Result<(), IntProxyError> { - self.task_txs - .agent - .send(ClientMessage::SwitchProtocolVersion( - self.protocol_version - .as_ref() - .unwrap_or(&mirrord_protocol::VERSION) - .clone(), - )) - .await; + async fn handle_connection_refresh( + &mut self, + kind: ConnectionRefresh, + ) -> Result<(), IntProxyError> { + match kind { + ConnectionRefresh::Start => { + // Initialise default reconnect message queue + self.reconnect_task_queue.get_or_insert_default(); + } + ConnectionRefresh::End => { + let Some(task_queue) = self.reconnect_task_queue.take() else { + return Err(IntProxyError::AgentFailed( + "unexpected state: agent reconnected finished without correctly initialzing a reconnect" + .into(), + )); + }; - self.task_txs - .files - .send(FilesProxyMessage::ConnectionRefresh) - .await; + self.task_txs + .agent + .send(ClientMessage::SwitchProtocolVersion( + self.protocol_version + .as_ref() + .unwrap_or(&mirrord_protocol::VERSION) + .clone(), + )) + .await; - self.task_txs - .incoming - .send(IncomingProxyMessage::ConnectionRefresh) - .await; + self.task_txs + .files + .send(FilesProxyMessage::ConnectionRefresh) + .await; + + self.task_txs + .incoming + .send(IncomingProxyMessage::ConnectionRefresh) + .await; + + Box::pin(async { + for msg in task_queue { + tracing::debug!(?msg, "dequeueing message for reconnect"); + + self.handle(msg).await? + } + + Ok::<(), IntProxyError>(()) + }) + .await? + } + } Ok(()) } diff --git a/mirrord/intproxy/src/main_tasks.rs b/mirrord/intproxy/src/main_tasks.rs index 7b7810c3a9c..bb61c8ec90f 100644 --- a/mirrord/intproxy/src/main_tasks.rs +++ b/mirrord/intproxy/src/main_tasks.rs @@ -20,7 +20,7 @@ pub enum ProxyMessage { /// New layer instance to serve. NewLayer(NewLayer), /// Connection to agent was dropped and needs reload. - ConnectionRefresh, + ConnectionRefresh(ConnectionRefresh), } #[cfg(test)] @@ -138,3 +138,11 @@ pub struct LayerForked { pub struct LayerClosed { pub id: LayerId, } + +/// Notification about start and end of reconnection to agent. +#[derive(Debug, Clone, Copy)] +#[cfg_attr(test, derive(PartialEq, Eq))] +pub enum ConnectionRefresh { + Start, + End, +}