Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mirrord connection reconnect #3000

Open
wants to merge 30 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
a797c18
Naive fix?
DmitryDodzin Jan 2, 2025
9101e84
Docs
DmitryDodzin Jan 2, 2025
2353fbb
x-reconnect-id
DmitryDodzin Jan 21, 2025
c654410
Current attempt
DmitryDodzin Jan 22, 2025
3db1d89
Cleanup not needed
DmitryDodzin Jan 23, 2025
cf17c0c
A tiny bit more cleanup
DmitryDodzin Jan 23, 2025
dde8e56
Changelog
DmitryDodzin Jan 23, 2025
3b64951
Merge remote-tracking branch 'metalbear-co/main' into dimad/mbe-510-p…
DmitryDodzin Jan 23, 2025
6185206
Merge branch 'main' into dimad/mbe-510-proxy-randomly-closes-during-c…
DmitryDodzin Jan 23, 2025
91f2341
Merge remote-tracking branch 'metalbear-co/main' into dimad/mbe-510-p…
DmitryDodzin Jan 28, 2025
2393d06
Docs and bus -> message_bus
DmitryDodzin Jan 28, 2025
de738d4
Tiny
DmitryDodzin Jan 28, 2025
17e5c1d
Ops
DmitryDodzin Jan 28, 2025
ff822ee
Docs
DmitryDodzin Jan 28, 2025
4fe4849
Merge remote-tracking branch 'metalbear-co/main' into dimad/mbe-510-p…
DmitryDodzin Jan 28, 2025
377fe96
Merge remote-tracking branch 'metalbear-co/main' into dimad/mbe-510-p…
DmitryDodzin Jan 28, 2025
1e729b8
Send SwitchProtocolVersion on ConnectionRefresh
DmitryDodzin Jan 28, 2025
83bd575
Merge branch 'main' into dimad/mbe-510-proxy-randomly-closes-during-c…
DmitryDodzin Jan 28, 2025
359f5fe
Merge remote-tracking branch 'metalbear-co/main' into dimad/mbe-510-p…
DmitryDodzin Feb 2, 2025
910d9b9
Update
DmitryDodzin Feb 2, 2025
8c379a4
Update mirrord/kube/src/api/kubernetes/portforwarder.rs
DmitryDodzin Feb 2, 2025
48d4f7f
Resubscribe only active_source
DmitryDodzin Feb 2, 2025
d79e53c
Doc fix
DmitryDodzin Feb 2, 2025
abe8709
Initial update to fs
DmitryDodzin Feb 3, 2025
ffdc1b0
Remove unsafe
DmitryDodzin Feb 3, 2025
06f5ac8
Merge branch 'main' into dimad/mbe-510-proxy-randomly-closes-during-c…
DmitryDodzin Feb 3, 2025
2e2ccd7
Add Version Check
DmitryDodzin Feb 3, 2025
32998d3
Update reconnect to be sort of stateful with message queue
DmitryDodzin Feb 3, 2025
ea3f493
Merge branch 'main' into dimad/mbe-510-proxy-randomly-closes-during-c…
DmitryDodzin Feb 3, 2025
f922303
Merge branch 'main' into dimad/mbe-510-proxy-randomly-closes-during-c…
DmitryDodzin Feb 4, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions changelog.d/+2901.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add reconnection mechanism when using mirrord operator.
1 change: 1 addition & 0 deletions changelog.d/+update-single-portforward.changed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a naive update to our port forward wrapper to force first check error channel instead of ranomly picking branch on `tokio::select!` impl.
1 change: 1 addition & 0 deletions mirrord/intproxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ hyper-util.workspace = true
http-body-util.workspace = true
bytes.workspace = true
rand.workspace = true
tokio-retry = "0.3"
tokio-rustls.workspace = true
rustls.workspace = true
rustls-pemfile.workspace = true
Expand Down
147 changes: 118 additions & 29 deletions mirrord/intproxy/src/agent_conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,20 @@ use std::{
io,
io::BufReader,
net::{IpAddr, SocketAddr},
ops::ControlFlow,
path::{Path, PathBuf},
sync::Arc,
sync::{Arc, LazyLock},
};

use mirrord_analytics::Reporter;
use mirrord_analytics::{NullReporter, Reporter};
use mirrord_config::LayerConfig;
use mirrord_kube::{
api::{
kubernetes::{AgentKubernetesConnectInfo, KubernetesAPI},
wrap_raw_connection,
},
api::{kubernetes::AgentKubernetesConnectInfo, wrap_raw_connection},
error::KubeApiError,
};
use mirrord_operator::client::{error::OperatorApiError, OperatorApi, OperatorSession};
use mirrord_protocol::{ClientMessage, DaemonMessage};
use semver::VersionReq;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::{
Expand All @@ -30,14 +29,26 @@ use tokio::{
mpsc::{Receiver, Sender},
},
};
use tokio_retry::{
strategy::{jitter, ExponentialBackoff},
Retry,
};
use tokio_rustls::TlsConnector;
use tracing::Level;

use crate::{
background_tasks::{BackgroundTask, MessageBus},
ProxyMessage,
background_tasks::{BackgroundTask, MessageBus, RestartableBackgroundTask},
main_tasks::{ConnectionRefresh, ProxyMessage},
};

mod portforward;

static OPERATOR_RETRY_VERSION: LazyLock<VersionReq> = LazyLock::new(|| {
">3.104.2"
.parse()
.expect("OPERATOR_RETRY_VERSION should be a valid VersionReq value")
});

/// Errors that can occur when the internal proxy tries to establish a connection with the agent.
#[derive(Error, Debug)]
pub enum AgentConnectionError {
Expand Down Expand Up @@ -97,6 +108,17 @@ pub enum AgentConnectInfo {
DirectKubernetes(AgentKubernetesConnectInfo),
}

#[derive(Debug, Default)]
pub enum ReconnectFlow {
ConnectInfo {
config: LayerConfig,
connect_info: AgentConnectInfo,
},

#[default]
Break,
}

/// Handles logic of the `proxy <-> agent` connection as a [`BackgroundTask`].
///
/// # Note
Expand All @@ -107,6 +129,7 @@ pub enum AgentConnectInfo {
pub struct AgentConnection {
pub agent_tx: Sender<ClientMessage>,
pub agent_rx: Receiver<DaemonMessage>,
pub reconnect: ReconnectFlow,
}

impl AgentConnection {
Expand All @@ -117,11 +140,23 @@ impl AgentConnection {
connect_info: Option<AgentConnectInfo>,
analytics: &mut R,
) -> Result<Self, AgentConnectionError> {
let (agent_tx, agent_rx) = match connect_info {
let (agent_tx, agent_rx, reconnect) = match connect_info {
Some(AgentConnectInfo::Operator(session)) => {
let connection =
OperatorApi::connect_in_existing_session(config, session, analytics).await?;
(connection.tx, connection.rx)
OperatorApi::connect_in_existing_session(config, session.clone(), analytics)
.await?;
(
connection.tx,
connection.rx,
if OPERATOR_RETRY_VERSION.matches(&session.operator_version) {
ReconnectFlow::ConnectInfo {
config: config.clone(),
connect_info: AgentConnectInfo::Operator(session),
}
} else {
ReconnectFlow::default()
},
)
}

Some(AgentConnectInfo::ExternalProxy(proxy_addr)) => {
Expand All @@ -131,7 +166,7 @@ impl AgentConnection {

let stream = socket.connect(proxy_addr).await?;

if config.external_proxy.tls_enable
let (tx, rx) = if config.external_proxy.tls_enable
&& let (
Some(tls_certificate),
Some(client_tls_certificate),
Expand All @@ -140,8 +175,7 @@ impl AgentConnection {
config.external_proxy.tls_certificate.as_ref(),
config.internal_proxy.client_tls_certificate.as_ref(),
config.internal_proxy.client_tls_key.as_ref(),
)
{
) {
wrap_connection_with_tls(
stream,
proxy_addr.ip(),
Expand All @@ -152,20 +186,14 @@ impl AgentConnection {
.await?
} else {
wrap_raw_connection(stream)
}
};

(tx, rx, ReconnectFlow::default())
}

Some(AgentConnectInfo::DirectKubernetes(connect_info)) => {
let k8s_api = KubernetesAPI::create(config)
.await
.map_err(AgentConnectionError::Kube)?;

let stream = k8s_api
.create_connection_portforward(connect_info.clone())
.await
.map_err(AgentConnectionError::Kube)?;

wrap_raw_connection(stream)
let (tx, rx) = portforward::create_connection(config, connect_info.clone()).await?;
(tx, rx, ReconnectFlow::default())
}

None => {
Expand All @@ -174,18 +202,27 @@ impl AgentConnection {
.as_ref()
.ok_or(AgentConnectionError::NoConnectionMethod)?;
let stream = TcpStream::connect(address).await?;
wrap_raw_connection(stream)
let (tx, rx) = wrap_raw_connection(stream);
(tx, rx, ReconnectFlow::default())
}
};

Ok(Self { agent_tx, agent_rx })
Ok(Self {
agent_tx,
agent_rx,
reconnect,
})
}

pub async fn new_for_raw_address(address: SocketAddr) -> Result<Self, AgentConnectionError> {
let stream = TcpStream::connect(address).await?;
let (agent_tx, agent_rx) = wrap_raw_connection(stream);

Ok(Self { agent_tx, agent_rx })
Ok(Self {
agent_tx,
agent_rx,
reconnect: ReconnectFlow::Break,
})
}

#[tracing::instrument(level = Level::TRACE, name = "send_agent_message", skip(self), ret)]
Expand All @@ -205,7 +242,7 @@ impl BackgroundTask for AgentConnection {
type MessageIn = ClientMessage;
type MessageOut = ProxyMessage;

async fn run(mut self, message_bus: &mut MessageBus<Self>) -> Result<(), Self::Error> {
async fn run(&mut self, message_bus: &mut MessageBus<Self>) -> Result<(), Self::Error> {
loop {
tokio::select! {
msg = message_bus.recv() => match msg {
Expand All @@ -227,6 +264,58 @@ impl BackgroundTask for AgentConnection {
break Err(AgentChannelError);
}
Some(msg) => message_bus.send(ProxyMessage::FromAgent(msg)).await,
},
}
}
}
}

impl RestartableBackgroundTask for AgentConnection {
#[tracing::instrument(level = Level::TRACE, skip(self, message_bus), ret)]
async fn restart(
&mut self,
error: Self::Error,
message_bus: &mut MessageBus<Self>,
) -> ControlFlow<Self::Error> {
match &self.reconnect {
ReconnectFlow::Break => ControlFlow::Break(error),
ReconnectFlow::ConnectInfo {
config,
connect_info,
} => {
message_bus
.send(ProxyMessage::ConnectionRefresh(ConnectionRefresh::Start))
.await;

let retry_strategy = ExponentialBackoff::from_millis(50).map(jitter).take(10);

let connection = Retry::spawn(retry_strategy, || async move {
AgentConnection::new(
config,
connect_info.clone().into(),
&mut NullReporter::default(),
)
.await
.inspect_err(
|err| tracing::error!(error = ?err, "unable to connect to agent upon retry"),
)
})
.await;

match connection {
Ok(connection) => {
*self = connection;
message_bus
.send(ProxyMessage::ConnectionRefresh(ConnectionRefresh::End))
.await;

ControlFlow::Continue(())
}
Err(error) => {
tracing::error!(?error, "unable to reconnect agent");

ControlFlow::Break(AgentChannelError)
}
}
}
}
Expand Down
25 changes: 25 additions & 0 deletions mirrord/intproxy/src/agent_conn/portforward.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use mirrord_config::LayerConfig;
use mirrord_kube::api::{
kubernetes::{AgentKubernetesConnectInfo, KubernetesAPI},
wrap_raw_connection,
};
use mirrord_protocol::{ClientMessage, DaemonMessage};
use tokio::sync::mpsc;

use crate::agent_conn::AgentConnectionError;

pub async fn create_connection(
config: &LayerConfig,
connect_info: AgentKubernetesConnectInfo,
) -> Result<(mpsc::Sender<ClientMessage>, mpsc::Receiver<DaemonMessage>), AgentConnectionError> {
let k8s_api = KubernetesAPI::create(config)
.await
.map_err(AgentConnectionError::Kube)?;

let stream = k8s_api
.create_connection_portforward(connect_info.clone())
.await
.map_err(AgentConnectionError::Kube)?;

Ok(wrap_raw_connection(stream))
}
Loading
Loading