diff --git a/Cargo.lock b/Cargo.lock index b580cd7c9fb..57b04477805 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4347,6 +4347,7 @@ dependencies = [ "serde", "thiserror 2.0.11", "tokio", + "tokio-retry", "tokio-rustls 0.26.1", "tokio-stream", "tracing", diff --git a/changelog.d/+2901.fixed.md b/changelog.d/+2901.fixed.md new file mode 100644 index 00000000000..b09e8b1d2ac --- /dev/null +++ b/changelog.d/+2901.fixed.md @@ -0,0 +1 @@ +Add reconnection mechanism when using mirrord operator. diff --git a/changelog.d/+update-single-portforward.changed.md b/changelog.d/+update-single-portforward.changed.md new file mode 100644 index 00000000000..5e2abfecc3e --- /dev/null +++ b/changelog.d/+update-single-portforward.changed.md @@ -0,0 +1 @@ +Add a naive update to our port forward wrapper to force first check error channel instead of ranomly picking branch on `tokio::select!` impl. diff --git a/mirrord/intproxy/Cargo.toml b/mirrord/intproxy/Cargo.toml index 8b39387ec59..0086e9a9f3d 100644 --- a/mirrord/intproxy/Cargo.toml +++ b/mirrord/intproxy/Cargo.toml @@ -37,6 +37,7 @@ hyper-util.workspace = true http-body-util.workspace = true bytes.workspace = true rand.workspace = true +tokio-retry = "0.3" tokio-rustls.workspace = true rustls.workspace = true rustls-pemfile.workspace = true diff --git a/mirrord/intproxy/src/agent_conn.rs b/mirrord/intproxy/src/agent_conn.rs index 1d982778411..99621a23a20 100644 --- a/mirrord/intproxy/src/agent_conn.rs +++ b/mirrord/intproxy/src/agent_conn.rs @@ -6,21 +6,20 @@ use std::{ io, io::BufReader, net::{IpAddr, SocketAddr}, + ops::ControlFlow, path::{Path, PathBuf}, - sync::Arc, + sync::{Arc, LazyLock}, }; -use mirrord_analytics::Reporter; +use mirrord_analytics::{NullReporter, Reporter}; use mirrord_config::LayerConfig; use mirrord_kube::{ - api::{ - kubernetes::{AgentKubernetesConnectInfo, KubernetesAPI}, - wrap_raw_connection, - }, + api::{kubernetes::AgentKubernetesConnectInfo, wrap_raw_connection}, error::KubeApiError, }; use mirrord_operator::client::{error::OperatorApiError, OperatorApi, OperatorSession}; use mirrord_protocol::{ClientMessage, DaemonMessage}; +use semver::VersionReq; use serde::{Deserialize, Serialize}; use thiserror::Error; use tokio::{ @@ -30,14 +29,26 @@ use tokio::{ mpsc::{Receiver, Sender}, }, }; +use tokio_retry::{ + strategy::{jitter, ExponentialBackoff}, + Retry, +}; use tokio_rustls::TlsConnector; use tracing::Level; use crate::{ - background_tasks::{BackgroundTask, MessageBus}, - ProxyMessage, + background_tasks::{BackgroundTask, MessageBus, RestartableBackgroundTask}, + main_tasks::{ConnectionRefresh, ProxyMessage}, }; +mod portforward; + +static OPERATOR_RETRY_VERSION: LazyLock = LazyLock::new(|| { + ">3.104.2" + .parse() + .expect("OPERATOR_RETRY_VERSION should be a valid VersionReq value") +}); + /// Errors that can occur when the internal proxy tries to establish a connection with the agent. #[derive(Error, Debug)] pub enum AgentConnectionError { @@ -97,6 +108,17 @@ pub enum AgentConnectInfo { DirectKubernetes(AgentKubernetesConnectInfo), } +#[derive(Debug, Default)] +pub enum ReconnectFlow { + ConnectInfo { + config: LayerConfig, + connect_info: AgentConnectInfo, + }, + + #[default] + Break, +} + /// Handles logic of the `proxy <-> agent` connection as a [`BackgroundTask`]. /// /// # Note @@ -107,6 +129,7 @@ pub enum AgentConnectInfo { pub struct AgentConnection { pub agent_tx: Sender, pub agent_rx: Receiver, + pub reconnect: ReconnectFlow, } impl AgentConnection { @@ -117,11 +140,23 @@ impl AgentConnection { connect_info: Option, analytics: &mut R, ) -> Result { - let (agent_tx, agent_rx) = match connect_info { + let (agent_tx, agent_rx, reconnect) = match connect_info { Some(AgentConnectInfo::Operator(session)) => { let connection = - OperatorApi::connect_in_existing_session(config, session, analytics).await?; - (connection.tx, connection.rx) + OperatorApi::connect_in_existing_session(config, session.clone(), analytics) + .await?; + ( + connection.tx, + connection.rx, + if OPERATOR_RETRY_VERSION.matches(&session.operator_version) { + ReconnectFlow::ConnectInfo { + config: config.clone(), + connect_info: AgentConnectInfo::Operator(session), + } + } else { + ReconnectFlow::default() + }, + ) } Some(AgentConnectInfo::ExternalProxy(proxy_addr)) => { @@ -131,7 +166,7 @@ impl AgentConnection { let stream = socket.connect(proxy_addr).await?; - if config.external_proxy.tls_enable + let (tx, rx) = if config.external_proxy.tls_enable && let ( Some(tls_certificate), Some(client_tls_certificate), @@ -140,8 +175,7 @@ impl AgentConnection { config.external_proxy.tls_certificate.as_ref(), config.internal_proxy.client_tls_certificate.as_ref(), config.internal_proxy.client_tls_key.as_ref(), - ) - { + ) { wrap_connection_with_tls( stream, proxy_addr.ip(), @@ -152,20 +186,14 @@ impl AgentConnection { .await? } else { wrap_raw_connection(stream) - } + }; + + (tx, rx, ReconnectFlow::default()) } Some(AgentConnectInfo::DirectKubernetes(connect_info)) => { - let k8s_api = KubernetesAPI::create(config) - .await - .map_err(AgentConnectionError::Kube)?; - - let stream = k8s_api - .create_connection_portforward(connect_info.clone()) - .await - .map_err(AgentConnectionError::Kube)?; - - wrap_raw_connection(stream) + let (tx, rx) = portforward::create_connection(config, connect_info.clone()).await?; + (tx, rx, ReconnectFlow::default()) } None => { @@ -174,18 +202,27 @@ impl AgentConnection { .as_ref() .ok_or(AgentConnectionError::NoConnectionMethod)?; let stream = TcpStream::connect(address).await?; - wrap_raw_connection(stream) + let (tx, rx) = wrap_raw_connection(stream); + (tx, rx, ReconnectFlow::default()) } }; - Ok(Self { agent_tx, agent_rx }) + Ok(Self { + agent_tx, + agent_rx, + reconnect, + }) } pub async fn new_for_raw_address(address: SocketAddr) -> Result { let stream = TcpStream::connect(address).await?; let (agent_tx, agent_rx) = wrap_raw_connection(stream); - Ok(Self { agent_tx, agent_rx }) + Ok(Self { + agent_tx, + agent_rx, + reconnect: ReconnectFlow::Break, + }) } #[tracing::instrument(level = Level::TRACE, name = "send_agent_message", skip(self), ret)] @@ -205,7 +242,7 @@ impl BackgroundTask for AgentConnection { type MessageIn = ClientMessage; type MessageOut = ProxyMessage; - async fn run(mut self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { + async fn run(&mut self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { loop { tokio::select! { msg = message_bus.recv() => match msg { @@ -227,6 +264,58 @@ impl BackgroundTask for AgentConnection { break Err(AgentChannelError); } Some(msg) => message_bus.send(ProxyMessage::FromAgent(msg)).await, + }, + } + } + } +} + +impl RestartableBackgroundTask for AgentConnection { + #[tracing::instrument(level = Level::TRACE, skip(self, message_bus), ret)] + async fn restart( + &mut self, + error: Self::Error, + message_bus: &mut MessageBus, + ) -> ControlFlow { + match &self.reconnect { + ReconnectFlow::Break => ControlFlow::Break(error), + ReconnectFlow::ConnectInfo { + config, + connect_info, + } => { + message_bus + .send(ProxyMessage::ConnectionRefresh(ConnectionRefresh::Start)) + .await; + + let retry_strategy = ExponentialBackoff::from_millis(50).map(jitter).take(10); + + let connection = Retry::spawn(retry_strategy, || async move { + AgentConnection::new( + config, + connect_info.clone().into(), + &mut NullReporter::default(), + ) + .await + .inspect_err( + |err| tracing::error!(error = ?err, "unable to connect to agent upon retry"), + ) + }) + .await; + + match connection { + Ok(connection) => { + *self = connection; + message_bus + .send(ProxyMessage::ConnectionRefresh(ConnectionRefresh::End)) + .await; + + ControlFlow::Continue(()) + } + Err(error) => { + tracing::error!(?error, "unable to reconnect agent"); + + ControlFlow::Break(AgentChannelError) + } } } } diff --git a/mirrord/intproxy/src/agent_conn/portforward.rs b/mirrord/intproxy/src/agent_conn/portforward.rs new file mode 100644 index 00000000000..9632323cc1f --- /dev/null +++ b/mirrord/intproxy/src/agent_conn/portforward.rs @@ -0,0 +1,25 @@ +use mirrord_config::LayerConfig; +use mirrord_kube::api::{ + kubernetes::{AgentKubernetesConnectInfo, KubernetesAPI}, + wrap_raw_connection, +}; +use mirrord_protocol::{ClientMessage, DaemonMessage}; +use tokio::sync::mpsc; + +use crate::agent_conn::AgentConnectionError; + +pub async fn create_connection( + config: &LayerConfig, + connect_info: AgentKubernetesConnectInfo, +) -> Result<(mpsc::Sender, mpsc::Receiver), AgentConnectionError> { + let k8s_api = KubernetesAPI::create(config) + .await + .map_err(AgentConnectionError::Kube)?; + + let stream = k8s_api + .create_connection_portforward(connect_info.clone()) + .await + .map_err(AgentConnectionError::Kube)?; + + Ok(wrap_raw_connection(stream)) +} diff --git a/mirrord/intproxy/src/background_tasks.rs b/mirrord/intproxy/src/background_tasks.rs index 2c73f80bece..ee4a7d8a24c 100644 --- a/mirrord/intproxy/src/background_tasks.rs +++ b/mirrord/intproxy/src/background_tasks.rs @@ -6,7 +6,7 @@ //! Each background task implements the [`BackgroundTask`] trait, which specifies its properties and //! allows for managing groups of related tasks with one [`BackgroundTasks`] instance. -use std::{collections::HashMap, fmt, future::Future, hash::Hash}; +use std::{collections::HashMap, fmt, future::Future, hash::Hash, ops::ControlFlow}; use thiserror::Error; use tokio::{ @@ -15,22 +15,26 @@ use tokio::{ }; use tokio_stream::{wrappers::ReceiverStream, StreamExt, StreamMap, StreamNotifyClose}; +pub type MessageBus = + MessageBusInner<::MessageIn, ::MessageOut>; + /// A struct that is meant to be the only way the [`BackgroundTask`]s can communicate with their /// parents. It allows the tasks to send and receive messages. -pub struct MessageBus { - tx: Sender, - rx: Receiver, +pub struct MessageBusInner { + tx: Sender, + rx: Receiver, + // Note if adding any new fields do look at `MessageBus::cast`'s unsafe block. } -impl MessageBus { +impl MessageBusInner { /// Attempts to send a message to this task's parent. - pub async fn send>(&self, msg: M) { + pub async fn send>(&self, msg: M) { let _ = self.tx.send(msg.into()).await; } /// Receives a message from this task's parent. /// [`None`] means that the channel is closed and there will be no more messages. - pub async fn recv(&mut self) -> Option { + pub async fn recv(&mut self) -> Option { tokio::select! { _ = self.tx.closed() => None, msg = self.rx.recv() => msg, @@ -38,7 +42,7 @@ impl MessageBus { } /// Returns a [`Closed`] instance for this [`MessageBus`]. - pub(crate) fn closed(&self) -> Closed { + pub(crate) fn closed(&self) -> Closed { Closed(self.tx.clone()) } } @@ -82,9 +86,9 @@ impl MessageBus { /// } /// } /// ``` -pub(crate) struct Closed(Sender); +pub(crate) struct Closed(Sender); -impl Closed { +impl Closed { /// Resolves the given [`Future`], unless the origin [`MessageBus`] closes first. /// /// # Returns @@ -113,11 +117,69 @@ pub trait BackgroundTask: Sized { /// When the [`MessageBus`] has no more messages to be consumed, the task should exit without /// errors. fn run( - self, + &mut self, message_bus: &mut MessageBus, ) -> impl Future> + Send; } +pub trait RestartableBackgroundTask: BackgroundTask { + fn restart( + &mut self, + run_error: Self::Error, + message_bus: &mut MessageBus, + ) -> impl Future> + Send; +} + +/// Small wrapper for `RestartableBackgroundTask` that wraps in reimplemets `BackgroundTask` with +/// logic of calling `restart` when an error is returned from `run` future. +/// +/// This is the created and used in `BackgroundTasks::register_restartable` +pub struct RestartableBackgroundTaskWrapper { + task: T, +} + +impl BackgroundTask for RestartableBackgroundTaskWrapper +where + T: RestartableBackgroundTask + Send, + T::MessageIn: Send, + T::MessageOut: Send, + T::Error: Send, +{ + type Error = T::Error; + type MessageIn = T::MessageIn; + type MessageOut = T::MessageOut; + + async fn run(&mut self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { + let RestartableBackgroundTaskWrapper { task } = self; + + match task.run(message_bus).await { + Err(run_error) => { + let mut run_error = Some(run_error); + + loop { + match task + .restart( + run_error.take().expect("should contain an error"), + message_bus, + ) + .await + { + ControlFlow::Break(err) => return Err(err), + ControlFlow::Continue(()) => { + if let Err(err) = task.run(message_bus).await { + run_error = Some(err); + } else { + return Ok(()); + } + } + } + } + } + Ok(()) => Ok(()), + } + } +} + /// A struct for managing groups of related [`BackgroundTasks`]. /// Tasks managed with a single instance of this struct must produce messages of the same type /// `MOut` and return errors convertible to `Err`. @@ -139,7 +201,7 @@ impl BackgroundTasks where Id: fmt::Debug + Hash + PartialEq + Eq + Clone + Unpin, Err: 'static + Send, - MOut: Send + Unpin, + MOut: 'static + Send + Unpin, { /// Registers a new background task in this struct. Returns a [`TaskSender`] that can be used to /// send messages to the task. Dropping this sender will close the channel of messages @@ -154,7 +216,7 @@ where /// # Panics /// /// This method panics when attempting to register a task with a duplicate id. - pub fn register(&mut self, task: T, id: Id, channel_size: usize) -> TaskSender + pub fn register(&mut self, mut task: T, id: Id, channel_size: usize) -> TaskSender where T: 'static + BackgroundTask + Send, Err: From, @@ -172,7 +234,7 @@ where StreamNotifyClose::new(ReceiverStream::new(out_msg_rx)), ); - let mut message_bus = MessageBus { + let mut message_bus = MessageBus:: { tx: out_msg_tx, rx: in_msg_rx, }; @@ -185,6 +247,27 @@ where TaskSender(in_msg_tx) } + pub fn register_restartable( + &mut self, + task: T, + id: Id, + channel_size: usize, + ) -> TaskSender> + where + T: 'static + RestartableBackgroundTask + Send, + Err: From, + T::MessageIn: Send, + T::Error: Send, + { + self.register(RestartableBackgroundTaskWrapper { task }, id, channel_size) + } + + pub fn clear(&mut self) { + for (id, _) in self.handles.drain() { + self.streams.remove(&id); + } + } + /// Returns the next update from one of registered tasks. pub async fn next(&mut self) -> Option<(Id, TaskUpdate)> { let (id, msg) = self.streams.next().await?; @@ -198,7 +281,10 @@ where .expect("task handles and streams are out of sync") .await; match res { - Err(..) => (id, TaskUpdate::Finished(Err(TaskError::Panic))), + Err(error) => { + tracing::error!(?error, "task panicked"); + (id, TaskUpdate::Finished(Err(TaskError::Panic))) + } Ok(res) => (id, TaskUpdate::Finished(res.map_err(TaskError::Error))), } } @@ -216,7 +302,10 @@ where let mut results = Vec::with_capacity(self.handles.len()); for (id, handle) in self.handles { let result = match handle.await { - Err(..) => Err(TaskError::Panic), + Err(error) => { + tracing::error!(?error, "task panicked"); + Err(TaskError::Panic) + } Ok(res) => res.map_err(TaskError::Error), }; results.push((id, result)); diff --git a/mirrord/intproxy/src/layer_conn.rs b/mirrord/intproxy/src/layer_conn.rs index 0cdd1c792a0..22178e39e0a 100644 --- a/mirrord/intproxy/src/layer_conn.rs +++ b/mirrord/intproxy/src/layer_conn.rs @@ -51,7 +51,7 @@ impl BackgroundTask for LayerConnection { type MessageIn = LocalMessage; type MessageOut = ProxyMessage; - async fn run(mut self, message_bus: &mut MessageBus) -> Result<(), CodecError> { + async fn run(&mut self, message_bus: &mut MessageBus) -> Result<(), CodecError> { loop { tokio::select! { res = self.layer_codec_rx.receive() => match res { diff --git a/mirrord/intproxy/src/layer_initializer.rs b/mirrord/intproxy/src/layer_initializer.rs index 0ca702b7d7d..ff793b81284 100644 --- a/mirrord/intproxy/src/layer_initializer.rs +++ b/mirrord/intproxy/src/layer_initializer.rs @@ -92,7 +92,7 @@ impl BackgroundTask for LayerInitializer { type MessageIn = (); type MessageOut = ProxyMessage; - async fn run(mut self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { + async fn run(&mut self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { loop { tokio::select! { None = message_bus.recv() => { diff --git a/mirrord/intproxy/src/lib.rs b/mirrord/intproxy/src/lib.rs index 7b5944bc307..1162db0ccbb 100644 --- a/mirrord/intproxy/src/lib.rs +++ b/mirrord/intproxy/src/lib.rs @@ -2,7 +2,10 @@ #![warn(clippy::indexing_slicing)] #![deny(unused_crate_dependencies)] -use std::{collections::HashMap, time::Duration}; +use std::{ + collections::{HashMap, VecDeque}, + time::Duration, +}; use background_tasks::{BackgroundTasks, TaskSender, TaskUpdate}; use error::UnexpectedAgentMessage; @@ -18,12 +21,15 @@ use proxies::{ outgoing::{OutgoingProxy, OutgoingProxyMessage}, simple::{SimpleProxy, SimpleProxyMessage}, }; +use semver::Version; use tokio::{net::TcpListener, time}; use tracing::Level; use crate::{ - agent_conn::AgentConnection, background_tasks::TaskError, error::IntProxyError, - main_tasks::LayerClosed, + agent_conn::AgentConnection, + background_tasks::{RestartableBackgroundTaskWrapper, TaskError}, + error::IntProxyError, + main_tasks::{ConnectionRefresh, LayerClosed}, }; pub mod agent_conn; @@ -41,7 +47,7 @@ mod request_queue; struct TaskTxs { layers: HashMap>, _layer_initializer: TaskSender, - agent: TaskSender, + agent: TaskSender>, simple: TaskSender, outgoing: TaskSender, incoming: TaskSender, @@ -58,6 +64,13 @@ pub struct IntProxy { any_connection_accepted: bool, background_tasks: BackgroundTasks, task_txs: TaskTxs, + + /// [`mirrord_protocol`] version negotiated with the agent. + protocol_version: Option, + + /// Temporary message queue for any [`ProxyMessage`] from layer or to agent that are sent + /// during reconnection state. + reconnect_task_queue: Option>, } impl IntProxy { @@ -77,8 +90,11 @@ impl IntProxy { let mut background_tasks: BackgroundTasks = Default::default(); - let agent = - background_tasks.register(agent_conn, MainTaskId::AgentConnection, Self::CHANNEL_SIZE); + let agent = background_tasks.register_restartable( + agent_conn, + MainTaskId::AgentConnection, + Self::CHANNEL_SIZE, + ); let layer_initializer = background_tasks.register( LayerInitializer::new(listener), MainTaskId::LayerInitializer, @@ -123,6 +139,8 @@ impl IntProxy { ping_pong, files, }, + protocol_version: None, + reconnect_task_queue: Default::default(), } } @@ -176,6 +194,15 @@ impl IntProxy { /// [`ProxyMessage::NewLayer`] is handled here, as an exception. async fn handle(&mut self, msg: ProxyMessage) -> Result<(), IntProxyError> { match msg { + ProxyMessage::NewLayer(_) | ProxyMessage::FromLayer(_) | ProxyMessage::ToAgent(_) + if self.reconnect_task_queue.is_some() => + { + // We are in reconnect state so should queue this message. + self.reconnect_task_queue + .as_mut() + .expect("reconnect_task_queue should contain value when in reconnect state") + .push_back(msg); + } ProxyMessage::NewLayer(new_layer) => { self.any_connection_accepted = true; @@ -220,6 +247,7 @@ impl IntProxy { .await; } } + ProxyMessage::ConnectionRefresh(kind) => self.handle_connection_refresh(kind).await?, } Ok(()) @@ -270,7 +298,7 @@ impl IntProxy { /// Routes most messages from the agent to the correct background task. /// Some messages are handled here. - #[tracing::instrument(level = Level::TRACE, skip(self), ret)] + #[tracing::instrument(level = Level::TRACE, skip(self), err)] async fn handle_agent_message(&mut self, message: DaemonMessage) -> Result<(), IntProxyError> { match message { DaemonMessage::Pong => self.task_txs.ping_pong.send(AgentSentPong).await, @@ -312,6 +340,8 @@ impl IntProxy { .await } DaemonMessage::SwitchProtocolVersionResponse(protocol_version) => { + let _ = self.protocol_version.insert(protocol_version.clone()); + if CLIENT_READY_FOR_LOGS.matches(&protocol_version) { self.task_txs.agent.send(ClientMessage::ReadyForLogs).await; } @@ -355,7 +385,7 @@ impl IntProxy { } /// Routes a message from the layer to the correct background task. - #[tracing::instrument(level = Level::TRACE, skip(self), ret)] + #[tracing::instrument(level = Level::TRACE, skip(self), err)] async fn handle_layer_message(&self, message: FromLayer) -> Result<(), IntProxyError> { let FromLayer { message_id, @@ -403,4 +433,58 @@ impl IntProxy { Ok(()) } + + #[tracing::instrument(level = Level::TRACE, skip(self), err)] + async fn handle_connection_refresh( + &mut self, + kind: ConnectionRefresh, + ) -> Result<(), IntProxyError> { + match kind { + ConnectionRefresh::Start => { + // Initialise default reconnect message queue + self.reconnect_task_queue.get_or_insert_default(); + } + ConnectionRefresh::End => { + let Some(task_queue) = self.reconnect_task_queue.take() else { + return Err(IntProxyError::AgentFailed( + "unexpected state: agent reconnected finished without correctly initialzing a reconnect" + .into(), + )); + }; + + self.task_txs + .agent + .send(ClientMessage::SwitchProtocolVersion( + self.protocol_version + .as_ref() + .unwrap_or(&mirrord_protocol::VERSION) + .clone(), + )) + .await; + + self.task_txs + .files + .send(FilesProxyMessage::ConnectionRefresh) + .await; + + self.task_txs + .incoming + .send(IncomingProxyMessage::ConnectionRefresh) + .await; + + Box::pin(async { + for msg in task_queue { + tracing::debug!(?msg, "dequeueing message for reconnect"); + + self.handle(msg).await? + } + + Ok::<(), IntProxyError>(()) + }) + .await? + } + } + + Ok(()) + } } diff --git a/mirrord/intproxy/src/main_tasks.rs b/mirrord/intproxy/src/main_tasks.rs index fd0e5d7e8e5..bb61c8ec90f 100644 --- a/mirrord/intproxy/src/main_tasks.rs +++ b/mirrord/intproxy/src/main_tasks.rs @@ -19,6 +19,8 @@ pub enum ProxyMessage { FromLayer(FromLayer), /// New layer instance to serve. NewLayer(NewLayer), + /// Connection to agent was dropped and needs reload. + ConnectionRefresh(ConnectionRefresh), } #[cfg(test)] @@ -136,3 +138,11 @@ pub struct LayerForked { pub struct LayerClosed { pub id: LayerId, } + +/// Notification about start and end of reconnection to agent. +#[derive(Debug, Clone, Copy)] +#[cfg_attr(test, derive(PartialEq, Eq))] +pub enum ConnectionRefresh { + Start, + End, +} diff --git a/mirrord/intproxy/src/ping_pong.rs b/mirrord/intproxy/src/ping_pong.rs index f83f0c696a2..e7723677657 100644 --- a/mirrord/intproxy/src/ping_pong.rs +++ b/mirrord/intproxy/src/ping_pong.rs @@ -66,7 +66,7 @@ impl BackgroundTask for PingPong { /// /// When the time comes to ping the agent and the previous ping was not answered, this task /// exits with an error. - async fn run(mut self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { + async fn run(&mut self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { loop { tokio::select! { _ = self.ticker.tick() => { diff --git a/mirrord/intproxy/src/proxies/files.rs b/mirrord/intproxy/src/proxies/files.rs index 55c1b2f98f0..f195a87e694 100644 --- a/mirrord/intproxy/src/proxies/files.rs +++ b/mirrord/intproxy/src/proxies/files.rs @@ -1,15 +1,13 @@ use core::fmt; -use std::{collections::HashMap, vec}; +use std::{ + collections::{HashMap, VecDeque}, + vec, +}; use mirrord_intproxy_protocol::{LayerId, MessageId, ProxyToLayerMessage}; use mirrord_protocol::{ - file::{ - CloseDirRequest, CloseFileRequest, DirEntryInternal, ReadDirBatchRequest, ReadDirResponse, - ReadFileResponse, ReadLimitedFileRequest, SeekFromInternal, MKDIR_VERSION, - READDIR_BATCH_VERSION, READLINK_VERSION, RMDIR_VERSION, STATFS_VERSION, - }, - ClientMessage, DaemonMessage, ErrorKindInternal, FileRequest, FileResponse, RemoteIOError, - ResponseError, + file::*, ClientMessage, DaemonMessage, ErrorKindInternal, FileRequest, FileResponse, + RemoteIOError, ResponseError, }; use semver::Version; use thiserror::Error; @@ -23,6 +21,107 @@ use crate::{ request_queue::RequestQueue, }; +fn agent_lost_io_error() -> ResponseError { + ResponseError::RemoteIO(RemoteIOError { + raw_os_error: None, + kind: ErrorKindInternal::Unknown("connection with mirrord-agent was lost".to_string()), + }) +} + +macro_rules! dummy_file_response { + ($name: ident) => { + FileResponse::$name(Err(ResponseError::NotImplemented)) + }; +} + +/// Lightweight (no allocations) [`FileResponse`] to be returned when connection with the +/// mirrord-agent is lost. Must be converted into real [`FileResponse`] via [`From`]. +pub struct AgentLostFileResponse(LayerId, MessageId, FileResponse); + +impl From for ToLayer { + fn from(value: AgentLostFileResponse) -> Self { + let AgentLostFileResponse(layer_id, message_id, response) = value; + let error = agent_lost_io_error(); + + let real_response = match response { + FileResponse::Access(..) => FileResponse::Access(Err(error)), + FileResponse::GetDEnts64(..) => FileResponse::GetDEnts64(Err(error)), + FileResponse::Open(..) => FileResponse::Open(Err(error)), + FileResponse::OpenDir(..) => FileResponse::OpenDir(Err(error)), + FileResponse::Read(..) => FileResponse::Read(Err(error)), + FileResponse::ReadDir(..) => FileResponse::ReadDir(Err(error)), + FileResponse::ReadDirBatch(..) => FileResponse::ReadDirBatch(Err(error)), + FileResponse::ReadLimited(..) => FileResponse::ReadLimited(Err(error)), + FileResponse::Seek(..) => FileResponse::Seek(Err(error)), + FileResponse::Write(..) => FileResponse::Write(Err(error)), + FileResponse::WriteLimited(..) => FileResponse::WriteLimited(Err(error)), + FileResponse::Xstat(..) => FileResponse::Xstat(Err(error)), + FileResponse::XstatFs(..) => FileResponse::XstatFs(Err(error)), + FileResponse::ReadLink(..) => FileResponse::ReadLink(Err(error)), + FileResponse::MakeDir(..) => FileResponse::MakeDir(Err(error)), + FileResponse::RemoveDir(..) => FileResponse::RemoveDir(Err(error)), + FileResponse::Unlink(..) => FileResponse::Unlink(Err(error)), + }; + + debug_assert_eq!( + std::mem::discriminant(&response), + std::mem::discriminant(&real_response), + ); + + ToLayer { + layer_id, + message_id, + message: ProxyToLayerMessage::File(real_response), + } + } +} + +/// Convenience trait for [`FileRequest`]. +trait FileRequestExt: Sized { + /// If this [`FileRequest`] requires a [`FileResponse`] from the agent, return corresponding + /// [`AgentLostFileResponse`]. + fn agent_lost_response( + &self, + layer_id: LayerId, + message_id: MessageId, + ) -> Option; +} + +impl FileRequestExt for FileRequest { + fn agent_lost_response( + &self, + layer_id: LayerId, + message_id: MessageId, + ) -> Option { + let response = match self { + Self::Close(..) | Self::CloseDir(..) => return None, + Self::Access(..) => dummy_file_response!(Access), + Self::FdOpenDir(..) => dummy_file_response!(OpenDir), + Self::GetDEnts64(..) => dummy_file_response!(GetDEnts64), + Self::Open(..) => dummy_file_response!(Open), + Self::OpenRelative(..) => dummy_file_response!(Open), + Self::Read(..) => dummy_file_response!(Read), + Self::ReadDir(..) => dummy_file_response!(ReadDir), + Self::ReadDirBatch(..) => dummy_file_response!(ReadDirBatch), + Self::ReadLimited(..) => dummy_file_response!(ReadLimited), + Self::Seek(..) => dummy_file_response!(Seek), + Self::Write(..) => dummy_file_response!(Write), + Self::WriteLimited(..) => dummy_file_response!(WriteLimited), + Self::Xstat(..) => dummy_file_response!(Xstat), + Self::XstatFs(..) => dummy_file_response!(XstatFs), + Self::ReadLink(..) => dummy_file_response!(ReadLink), + Self::MakeDir(..) => dummy_file_response!(MakeDir), + Self::MakeDirAt(..) => dummy_file_response!(MakeDir), + Self::Unlink(..) => dummy_file_response!(Unlink), + Self::UnlinkAt(..) => dummy_file_response!(Unlink), + Self::RemoveDir(..) => dummy_file_response!(RemoveDir), + Self::StatFs(..) => dummy_file_response!(XstatFs), + }; + + Some(AgentLostFileResponse(layer_id, message_id, response)) + } +} + /// Messages handled by [`FilesProxy`]. #[derive(Debug)] pub enum FilesProxyMessage { @@ -36,6 +135,8 @@ pub enum FilesProxyMessage { LayerForked(LayerForked), /// Layer instance closed. LayerClosed(LayerClosed), + /// Agent connection was refreshed + ConnectionRefresh, } /// Error that can occur in [`FilesProxy`]. @@ -123,6 +224,164 @@ enum AdditionalRequestData { Other, } +/// Manages state of file operations. Remaps remote file descriptors and returns early +/// [`ResponseError`]s for [`FileRequest`]s related to invalidated (agent lost) descriptors. +/// Tracks state of outstanding [`FileRequest`]s to respond with errors in case the agent is lost. +#[derive(Default)] +pub struct RouterFileOps { + /// Highest file fd we've returned to the client (after remapping). + highest_user_facing_fd: Option, + /// Offset we need to add to every fd we receive from the mirrord-agent. + /// All lesser fds received from the clients are invalid (probably lost with previous + /// mirrord-agent responsible for file ops). + current_fd_offset: u64, + /// Prepared error responses to outstanding [`FileRequest`]s. + /// We must flush these when connection to the mirrord-agent is lost, otherwise the layer will + /// hang. + queued_error_responses: VecDeque, +} + +impl fmt::Debug for RouterFileOps { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RouterFileOps") + .field("highest_user_facing_fd", &self.highest_user_facing_fd) + .field("current_fd_offset", &self.current_fd_offset) + .finish() + } +} + +impl RouterFileOps { + /// Return a request to be sent to the agent ([`Ok`] variant) or + /// a response to be sent to the user ([`Err`] variant). + #[tracing::instrument(level = Level::TRACE, ret, err(level = Level::TRACE, Debug))] + pub fn map_request( + &mut self, + layer_id: LayerId, + message_id: MessageId, + mut request: FileRequest, + ) -> Result, ToLayer> { + match &mut request { + // These requests do not refer to any open remote fd. + // It's safe to pass them as they are. + FileRequest::Open(..) + | FileRequest::Access(..) + | FileRequest::Xstat(XstatRequest { fd: None, .. }) + | FileRequest::ReadLink(..) + | FileRequest::MakeDir(..) + | FileRequest::Unlink(..) + | FileRequest::RemoveDir(..) + | FileRequest::StatFs(..) + | FileRequest::UnlinkAt(UnlinkAtRequest { dirfd: None, .. }) => {} + + // These requests do not require any response from the agent. + // We need to remap the fd, but if the fd is invalid we simply drop them. + FileRequest::Close(CloseFileRequest { fd: remote_fd }) + | FileRequest::CloseDir(CloseDirRequest { remote_fd }) => { + if *remote_fd < self.current_fd_offset { + return Ok(None); + } + + *remote_fd -= self.current_fd_offset; + } + + // These requests refer to an open remote fd and require a response from the agent. + // We need to remap the fd and respond with an error if the fd is invalid. + FileRequest::FdOpenDir(FdOpenDirRequest { remote_fd }) + | FileRequest::GetDEnts64(GetDEnts64Request { remote_fd, .. }) + | FileRequest::OpenRelative(OpenRelativeFileRequest { + relative_fd: remote_fd, + .. + }) + | FileRequest::Read(ReadFileRequest { remote_fd, .. }) + | FileRequest::ReadDir(ReadDirRequest { remote_fd, .. }) + | FileRequest::ReadDirBatch(ReadDirBatchRequest { remote_fd, .. }) + | FileRequest::ReadLimited(ReadLimitedFileRequest { remote_fd, .. }) + | FileRequest::Seek(SeekFileRequest { fd: remote_fd, .. }) + | FileRequest::Write(WriteFileRequest { fd: remote_fd, .. }) + | FileRequest::WriteLimited(WriteLimitedFileRequest { remote_fd, .. }) + | FileRequest::Xstat(XstatRequest { + fd: Some(remote_fd), + .. + }) + | FileRequest::XstatFs(XstatFsRequest { fd: remote_fd }) + | FileRequest::MakeDirAt(MakeDirAtRequest { + dirfd: remote_fd, .. + }) + | FileRequest::UnlinkAt(UnlinkAtRequest { + dirfd: Some(remote_fd), + .. + }) => { + if *remote_fd < self.current_fd_offset { + let error_response = request + .agent_lost_response(layer_id, message_id) + .expect("these requests require responses") + .into(); + return Err(error_response); + } + + *remote_fd -= self.current_fd_offset; + } + }; + + if let Some(response) = request.agent_lost_response(layer_id, message_id) { + self.queued_error_responses.push_back(response); + } + + Ok(Some(request)) + } + + /// Return a response to be sent to the client. + #[tracing::instrument(level = Level::TRACE, ret)] + pub fn map_response(&mut self, mut response: FileResponse) -> FileResponse { + match &mut response { + // These responses do not refer to any open remote fd. + FileResponse::Access(..) + | FileResponse::Read(..) + | FileResponse::ReadLimited(..) + | FileResponse::ReadDir(..) + | FileResponse::Seek(..) + | FileResponse::Write(..) + | FileResponse::WriteLimited(..) + | FileResponse::Xstat(..) + | FileResponse::XstatFs(..) + | FileResponse::GetDEnts64(Err(..)) + | FileResponse::Open(Err(..)) + | FileResponse::OpenDir(Err(..)) + | FileResponse::ReadDirBatch(Err(..)) + | FileResponse::ReadLink(..) + | FileResponse::MakeDir(..) + | FileResponse::Unlink(..) + | FileResponse::RemoveDir(..) => {} + + FileResponse::GetDEnts64(Ok(GetDEnts64Response { fd: remote_fd, .. })) + | FileResponse::Open(Ok(OpenFileResponse { fd: remote_fd })) + | FileResponse::OpenDir(Ok(OpenDirResponse { fd: remote_fd, .. })) + | FileResponse::ReadDirBatch(Ok(ReadDirBatchResponse { fd: remote_fd, .. })) => { + *remote_fd += self.current_fd_offset; + + self.highest_user_facing_fd = + std::cmp::max(self.highest_user_facing_fd, Some(*remote_fd)); + } + } + + self.queued_error_responses.pop_front(); + + response + } + + /// Notify this manager that the agent was lost. + /// Return messages to be sent to the user. + #[tracing::instrument(level = Level::TRACE)] + pub fn agent_lost(&mut self) -> impl Iterator { + self.current_fd_offset = self.highest_user_facing_fd.map(|fd| fd + 1).unwrap_or(0); + + std::mem::take(&mut self.queued_error_responses) + .into_iter() + .map(ToLayer::from) + .map(ProxyMessage::ToLayer) + } +} + /// For handling all file operations. /// Run as a [`BackgroundTask`]. /// @@ -169,6 +428,8 @@ pub struct FilesProxy { remote_dirs: RemoteResources, /// Locally stored data of buffered directories. buffered_dirs: HashMap, + + reconnect_tracker: RouterFileOps, } impl fmt::Debug for FilesProxy { @@ -180,6 +441,7 @@ impl fmt::Debug for FilesProxy { .field("buffered_dirs", &self.buffered_dirs) .field("protocol_version", &self.protocol_version) .field("request_queue", &self.request_queue) + .field("reconnect_tracker", &self.reconnect_tracker) .finish() } } @@ -206,6 +468,8 @@ impl FilesProxy { remote_dirs: Default::default(), buffered_dirs: Default::default(), + + reconnect_tracker: Default::default(), } } @@ -761,6 +1025,24 @@ impl FilesProxy { Ok(()) } + + async fn handle_reconnect(&mut self, message_bus: &mut MessageBus) { + for (_, fds) in self.remote_files.drain() { + for fd in fds { + self.buffered_files.remove(&fd); + } + } + + for (_, fds) in self.remote_dirs.drain() { + for fd in fds { + self.buffered_dirs.remove(&fd); + } + } + + for response in self.reconnect_tracker.agent_lost() { + message_bus.send(response).await; + } + } } impl BackgroundTask for FilesProxy { @@ -768,16 +1050,28 @@ impl BackgroundTask for FilesProxy { type MessageOut = ProxyMessage; type Error = FilesProxyError; - async fn run(mut self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { + async fn run(&mut self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { while let Some(message) = message_bus.recv().await { tracing::trace!(?message, "new message in message_bus"); match message { FilesProxyMessage::FileReq(message_id, layer_id, request) => { - self.file_request(request, layer_id, message_id, message_bus) - .await; + match self + .reconnect_tracker + .map_request(layer_id, message_id, request) + { + Ok(None) => {} + Err(response) => { + message_bus.send(response).await; + } + Ok(Some(request)) => { + self.file_request(request, layer_id, message_id, message_bus) + .await + } + }; } FilesProxyMessage::FileRes(response) => { + let response = self.reconnect_tracker.map_response(response); self.file_response(response, message_bus).await?; } FilesProxyMessage::LayerClosed(closed) => { @@ -785,6 +1079,7 @@ impl BackgroundTask for FilesProxy { } FilesProxyMessage::LayerForked(forked) => self.layer_forked(forked), FilesProxyMessage::ProtocolVersion(version) => self.protocol_version(version), + FilesProxyMessage::ConnectionRefresh => self.handle_reconnect(message_bus).await, } } diff --git a/mirrord/intproxy/src/proxies/incoming.rs b/mirrord/intproxy/src/proxies/incoming.rs index 6ffa446dd4c..d0700afefe7 100644 --- a/mirrord/intproxy/src/proxies/incoming.rs +++ b/mirrord/intproxy/src/proxies/incoming.rs @@ -66,6 +66,7 @@ pub enum IncomingProxyMessage { AgentSteal(DaemonTcp), /// Agent responded to [`ClientMessage::SwitchProtocolVersion`]. AgentProtocolVersion(semver::Version), + ConnectionRefresh, } /// Handle to a running [`HttpGatewayTask`]. @@ -533,6 +534,20 @@ impl IncomingProxy { IncomingProxyMessage::AgentProtocolVersion(version) => { self.response_mode = ResponseMode::from(&version); } + + IncomingProxyMessage::ConnectionRefresh => { + self.mirror_tcp_proxies.clear(); + self.steal_tcp_proxies.clear(); + self.tasks.clear(); + + for subscription in self.subscriptions.iter_mut() { + tracing::debug!(?subscription, "resubscribing"); + + message_bus + .send(ProxyMessage::ToAgent(subscription.resubscribe_message())) + .await + } + } } Ok(()) @@ -701,7 +716,7 @@ impl BackgroundTask for IncomingProxy { type MessageOut = ProxyMessage; #[tracing::instrument(level = Level::TRACE, name = "incoming_proxy_main_loop", skip_all, err)] - async fn run(mut self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { + async fn run(&mut self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { loop { tokio::select! { msg = message_bus.recv() => match msg { diff --git a/mirrord/intproxy/src/proxies/incoming/http_gateway.rs b/mirrord/intproxy/src/proxies/incoming/http_gateway.rs index e7f8a7819b0..16f1b166820 100644 --- a/mirrord/intproxy/src/proxies/incoming/http_gateway.rs +++ b/mirrord/intproxy/src/proxies/incoming/http_gateway.rs @@ -301,7 +301,7 @@ impl BackgroundTask for HttpGatewayTask { type MessageOut = InProxyTaskMessage; #[tracing::instrument(level = Level::TRACE, name = "http_gateway_task_main_loop", skip(message_bus))] - async fn run(self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { + async fn run(&mut self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { let mut backoffs = Backoff::new(10, Duration::from_millis(50), Duration::from_millis(500)).into_iter(); let guard = message_bus.closed(); diff --git a/mirrord/intproxy/src/proxies/incoming/subscriptions.rs b/mirrord/intproxy/src/proxies/incoming/subscriptions.rs index 7731707d8c8..5f41ce0ca1a 100644 --- a/mirrord/intproxy/src/proxies/incoming/subscriptions.rs +++ b/mirrord/intproxy/src/proxies/incoming/subscriptions.rs @@ -25,7 +25,7 @@ struct Source { /// Represents a port subscription in the agent. #[derive(Debug)] -struct Subscription { +pub struct Subscription { /// Previous sources of this subscription. Each of these was at some point active, but was /// later overwritten. queued_sources: Vec, @@ -144,6 +144,12 @@ impl Subscription { )), } } + + pub fn resubscribe_message(&mut self) -> ClientMessage { + self.confirmed = false; + + self.active_source.request.subscription.agent_subscribe() + } } /// Manages port subscriptions across all connected layers. @@ -306,6 +312,10 @@ impl SubscriptionsManager { pub fn layer_forked(&mut self, parent: LayerId, child: LayerId) { self.remote_ports.clone_all(parent, child); } + + pub fn iter_mut(&mut self) -> impl Iterator + '_ { + self.subscriptions.values_mut() + } } #[cfg(test)] diff --git a/mirrord/intproxy/src/proxies/incoming/tcp_proxy.rs b/mirrord/intproxy/src/proxies/incoming/tcp_proxy.rs index 6929f65b2f0..16330782a56 100644 --- a/mirrord/intproxy/src/proxies/incoming/tcp_proxy.rs +++ b/mirrord/intproxy/src/proxies/incoming/tcp_proxy.rs @@ -41,7 +41,7 @@ pub enum LocalTcpConnection { #[derive(Debug)] pub struct TcpProxyTask { /// The local connection between this task and the user application. - connection: LocalTcpConnection, + connection: Option, /// Whether this task should silently discard data coming from the user application. /// /// The data is discarded only when the remote connection is mirrored. @@ -60,7 +60,7 @@ impl TcpProxyTask { /// application. pub fn new(connection: LocalTcpConnection, discard_data: bool) -> Self { Self { - connection, + connection: Some(connection), discard_data, } } @@ -72,8 +72,12 @@ impl BackgroundTask for TcpProxyTask { type MessageOut = InProxyTaskMessage; #[tracing::instrument(level = Level::TRACE, name = "tcp_proxy_task_main_loop", skip(message_bus), err(level = Level::WARN))] - async fn run(self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { - let mut stream = match self.connection { + async fn run(&mut self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { + let mut stream = match self + .connection + .take() + .expect("task should have a valid connection before run") + { LocalTcpConnection::FromTheStart { socket, peer } => { let Some(stream) = message_bus .closed() diff --git a/mirrord/intproxy/src/proxies/outgoing.rs b/mirrord/intproxy/src/proxies/outgoing.rs index d7782053aa9..1f0372b255b 100644 --- a/mirrord/intproxy/src/proxies/outgoing.rs +++ b/mirrord/intproxy/src/proxies/outgoing.rs @@ -234,7 +234,7 @@ impl BackgroundTask for OutgoingProxy { type MessageIn = OutgoingProxyMessage; type MessageOut = ProxyMessage; - async fn run(mut self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { + async fn run(&mut self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { loop { tokio::select! { msg = message_bus.recv() => match msg { diff --git a/mirrord/intproxy/src/proxies/outgoing/interceptor.rs b/mirrord/intproxy/src/proxies/outgoing/interceptor.rs index 12c729b6d6c..cc9f6aeabd5 100644 --- a/mirrord/intproxy/src/proxies/outgoing/interceptor.rs +++ b/mirrord/intproxy/src/proxies/outgoing/interceptor.rs @@ -12,14 +12,16 @@ use crate::{ /// Multiple instances are run as [`BackgroundTask`]s by one [`OutgoingProxy`](super::OutgoingProxy) /// to manage individual connections. pub struct Interceptor { - socket: PreparedSocket, + socket: Option, } impl Interceptor { /// Creates a new instance. This instance will use the provided [`PreparedSocket`] to accept the /// layer's connection and manage it. pub fn new(socket: PreparedSocket) -> Self { - Self { socket } + Self { + socket: Some(socket), + } } } @@ -42,8 +44,12 @@ impl BackgroundTask for Interceptor { /// /// 3. This implementation exits only when an error is encountered or the [`MessageBus`] is /// closed. - async fn run(self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { - let mut connected_socket = self.socket.accept().await?; + async fn run(&mut self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { + let Some(socket) = self.socket.take() else { + return Ok(()); + }; + + let mut connected_socket = socket.accept().await?; let mut reading_closed = false; loop { diff --git a/mirrord/intproxy/src/proxies/simple.rs b/mirrord/intproxy/src/proxies/simple.rs index 49e643b259a..6efa5416865 100644 --- a/mirrord/intproxy/src/proxies/simple.rs +++ b/mirrord/intproxy/src/proxies/simple.rs @@ -65,7 +65,7 @@ impl BackgroundTask for SimpleProxy { type MessageIn = SimpleProxyMessage; type MessageOut = ProxyMessage; - async fn run(mut self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { + async fn run(&mut self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { while let Some(msg) = message_bus.recv().await { tracing::trace!(?msg, "new message in message_bus"); diff --git a/mirrord/intproxy/src/remote_resources.rs b/mirrord/intproxy/src/remote_resources.rs index 055455b7818..645ca3fece2 100644 --- a/mirrord/intproxy/src/remote_resources.rs +++ b/mirrord/intproxy/src/remote_resources.rs @@ -123,4 +123,16 @@ where *self.counts.entry(resource).or_default() += 1; } } + + /// Removes all resources held all layers instances. + /// Returns an [`Iterator`] of layers and remote files/folders that were removed. + /// + /// Should be used for when the remote is lost and there is a need to restart. + #[tracing::instrument(level = Level::TRACE, skip(self))] + pub(crate) fn drain(&mut self) -> impl '_ + Iterator)> { + let ids: Vec<_> = self.by_layer.keys().cloned().collect(); + + ids.into_iter() + .map(|id| (id, self.remove_all(id).collect())) + } } diff --git a/mirrord/kube/src/api/kubernetes/portforwarder.rs b/mirrord/kube/src/api/kubernetes/portforwarder.rs index bd5e6d8c983..893e4ee82da 100644 --- a/mirrord/kube/src/api/kubernetes/portforwarder.rs +++ b/mirrord/kube/src/api/kubernetes/portforwarder.rs @@ -162,16 +162,21 @@ impl SinglePortForwarder { loop { tokio::select! { - error = error_future.as_mut() => { + biased; - if retry_strategy.peek().is_none() || error.is_none() { - tracing::warn!(?connect_info, "finished retry strategy, closing connection"); + error = error_future.as_mut() => { + if let Some(error) = error { + tracing::warn!(?connect_info, %error, "error while performing port-forward"); + } else { + tracing::warn!(?connect_info, "port-forward connection closed without any error published in relevant channel"); break; } - if let Some(error) = error { - tracing::warn!(?connect_info, %error, "error while performing port-forward, retrying"); + if retry_strategy.peek().is_none() { + tracing::warn!(?connect_info, "port-forward connection retry strategy has reached it's limit on attempts to connect, not attempting again."); + + break; } match create_portforward_streams(&pod_api, &connect_info, &mut retry_strategy).await { @@ -193,9 +198,11 @@ impl SinglePortForwarder { copy_result = tokio::io::copy_bidirectional(&mut stream, &mut sink) => { if let Err(error) = copy_result { tracing::error!(?connect_info, %error, "unable to copy_bidirectional agent stream to local sink"); - - break; + } else { + tracing::info!(?connect_info, "closed copy_bidirectional agent stream to local sink"); } + + break; } } } @@ -208,7 +215,8 @@ pub async fn retry_portforward( client: &Client, connect_info: AgentKubernetesConnectInfo, ) -> Result<(Box, SinglePortForwarder)> { - let (lhs, rhs) = tokio::io::duplex(4096); + // use 1024 * 1024 to be identical to [`kube` implementation](https://github.com/kube-rs/kube/blob/ecbdafc214538aadc78ec8447f2fa12d0057492b/kube-client/src/api/portforward.rs#L101). + let (lhs, rhs) = tokio::io::duplex(1024 * 1024); let port_forwarder = SinglePortForwarder::connect(client, connect_info, Box::new(rhs)).await?; diff --git a/mirrord/operator/src/client.rs b/mirrord/operator/src/client.rs index 26f3d66e5b1..8144c99a988 100644 --- a/mirrord/operator/src/client.rs +++ b/mirrord/operator/src/client.rs @@ -113,6 +113,9 @@ pub struct OperatorSession { /// Version of [`mirrord_protocol`] used by the operator. /// Used to create [`ConnectionWrapper`]. pub operator_protocol_version: Option, + + /// Version of the operator. + pub operator_version: Version, } impl fmt::Debug for OperatorSession { @@ -126,6 +129,7 @@ impl fmt::Debug for OperatorSession { &self.operator_license_fingerprint, ) .field("operator_protocol_version", &self.operator_protocol_version) + .field("operator_version", &self.operator_version) .finish() } } @@ -662,6 +666,7 @@ impl OperatorApi { .protocol_version .as_ref() .and_then(|version| version.parse().ok()), + operator_version: self.operator.spec.operator_version.clone(), }; let mut connection_subtask = progress.subtask("connecting to the target");