Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
DmitryDodzin committed Feb 2, 2025
1 parent 359f5fe commit 910d9b9
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 28 deletions.
17 changes: 5 additions & 12 deletions mirrord/intproxy/src/background_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl<T: BackgroundTask> MessageBus<T> {
}

/// Cast `&mut MessageBus<T>` as `&mut MessageBus<R>` only if they share the same message types
pub fn cast<R>(&mut self) -> &mut MessageBus<R>
pub(crate) fn cast<R>(&mut self) -> &mut MessageBus<R>
where
R: BackgroundTask<MessageIn = T::MessageIn, MessageOut = T::MessageOut>,
{
Expand Down Expand Up @@ -267,17 +267,10 @@ where
self.register(RestartableBackgroundTaskWrapper { task }, id, channel_size)
}

pub fn tasks_ids(&self) -> impl Iterator<Item = &Id> {
self.handles.keys()
}

pub async fn kill_task(&mut self, id: Id) {
self.streams.remove(&id);
let Some(task) = self.handles.remove(&id) else {
return;
};

task.abort();
pub fn clear(&mut self) {
for (id, _) in self.handles.drain() {
self.streams.remove(&id);
}
}

/// Returns the next update from one of registered tasks.
Expand Down
22 changes: 20 additions & 2 deletions mirrord/intproxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use proxies::{
outgoing::{OutgoingProxy, OutgoingProxyMessage},
simple::{SimpleProxy, SimpleProxyMessage},
};
use semver::Version;
use tokio::{net::TcpListener, time};
use tracing::Level;

Expand Down Expand Up @@ -60,6 +61,10 @@ pub struct IntProxy {
any_connection_accepted: bool,
background_tasks: BackgroundTasks<MainTaskId, ProxyMessage, IntProxyError>,
task_txs: TaskTxs,
/// [`mirrord_protocol`] version negotiated with the agent.
/// Determines whether we can use some messages, like [`FileRequest::ReadDirBatch`] or

Check failure on line 65 in mirrord/intproxy/src/lib.rs

View workflow job for this annotation

GitHub Actions / check-rust-docs

unresolved link to `FileRequest::ReadDirBatch`
/// [`FileRequest::ReadLink`].

Check failure on line 66 in mirrord/intproxy/src/lib.rs

View workflow job for this annotation

GitHub Actions / check-rust-docs

unresolved link to `FileRequest::ReadLink`
protocol_version: Option<Version>,
}

impl IntProxy {
Expand Down Expand Up @@ -128,6 +133,7 @@ impl IntProxy {
ping_pong,
files,
},
protocol_version: None,
}
}

Expand Down Expand Up @@ -318,6 +324,8 @@ impl IntProxy {
.await
}
DaemonMessage::SwitchProtocolVersionResponse(protocol_version) => {
let _ = self.protocol_version.insert(protocol_version.clone());

if CLIENT_READY_FOR_LOGS.matches(&protocol_version) {
self.task_txs.agent.send(ClientMessage::ReadyForLogs).await;
}
Expand Down Expand Up @@ -413,8 +421,18 @@ impl IntProxy {
#[tracing::instrument(level = Level::TRACE, skip(self), err)]
async fn handle_connection_refresh(&self) -> Result<(), IntProxyError> {
self.task_txs
.simple
.send(SimpleProxyMessage::ConnectionRefresh)
.agent
.send(ClientMessage::SwitchProtocolVersion(
self.protocol_version
.as_ref()
.unwrap_or(&mirrord_protocol::VERSION)
.clone(),
))
.await;

self.task_txs
.files
.send(FilesProxyMessage::ConnectionRefresh)
.await;

self.task_txs
Expand Down
17 changes: 17 additions & 0 deletions mirrord/intproxy/src/proxies/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ pub enum FilesProxyMessage {
LayerForked(LayerForked),
/// Layer instance closed.
LayerClosed(LayerClosed),
/// Agent connection was refreshed
ConnectionRefresh,
}

/// Error that can occur in [`FilesProxy`].
Expand Down Expand Up @@ -761,6 +763,20 @@ impl FilesProxy {

Ok(())
}

async fn handle_reconnect(&mut self, _message_bus: &mut MessageBus<Self>) {
for (_, fds) in self.remote_files.drain() {
for fd in fds {
self.buffered_files.remove(&fd);
}
}

for (_, fds) in self.remote_dirs.drain() {
for fd in fds {
self.buffered_dirs.remove(&fd);
}
}
}
}

impl BackgroundTask for FilesProxy {
Expand All @@ -785,6 +801,7 @@ impl BackgroundTask for FilesProxy {
}
FilesProxyMessage::LayerForked(forked) => self.layer_forked(forked),
FilesProxyMessage::ProtocolVersion(version) => self.protocol_version(version),
FilesProxyMessage::ConnectionRefresh => self.handle_reconnect(message_bus).await,
}
}

Expand Down
8 changes: 3 additions & 5 deletions mirrord/intproxy/src/proxies/incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,11 +536,9 @@ impl IncomingProxy {
}

IncomingProxyMessage::ConnectionRefresh => {
let running_task_ids = self.tasks.tasks_ids().cloned().collect::<Vec<_>>();

for task in running_task_ids {
self.tasks.kill_task(task).await;
}
self.mirror_tcp_proxies.clear();
self.steal_tcp_proxies.clear();
self.tasks.clear();

for subscription in self.subscriptions.iter_mut() {
tracing::debug!(?subscription, "resubscribing");
Expand Down
9 changes: 0 additions & 9 deletions mirrord/intproxy/src/proxies/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ pub enum SimpleProxyMessage {
GetEnvRes(RemoteResult<HashMap<String, String>>),
/// Protocol version was negotiated with the agent.
ProtocolVersion(Version),
/// Agent connection was refreshed need to negotiate version
ConnectionRefresh,
}

#[derive(Error, Debug)]
Expand Down Expand Up @@ -125,13 +123,6 @@ impl BackgroundTask for SimpleProxy {
.await
}
SimpleProxyMessage::ProtocolVersion(version) => self.set_protocol_version(version),
SimpleProxyMessage::ConnectionRefresh => {
if let Some(version) = &self.protocol_version {
message_bus
.send(ClientMessage::SwitchProtocolVersion(version.clone()))
.await
}
}
}
}

Expand Down
12 changes: 12 additions & 0 deletions mirrord/intproxy/src/remote_resources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,16 @@ where
*self.counts.entry(resource).or_default() += 1;
}
}

/// Removes all resources held all layers instances.
/// Returns an [`Iterator`] of layers and remote files/folders that were removed.
///
/// Should be used for when the remote is lost and there is a need to restart.
#[tracing::instrument(level = Level::TRACE, skip(self))]
pub(crate) fn drain(&mut self) -> impl '_ + Iterator<Item = (LayerId, Vec<T>)> {
let ids: Vec<_> = self.by_layer.keys().cloned().collect();

ids.into_iter()
.map(|id| (id, self.remove_all(id).collect()))
}
}

0 comments on commit 910d9b9

Please sign in to comment.