From 26c0dc74077726fdad5d08eccdadff22a2735be3 Mon Sep 17 00:00:00 2001 From: Mehul <65443164+infiniteregrets@users.noreply.github.com> Date: Fri, 3 Jun 2022 06:44:02 -0400 Subject: [PATCH] Add a KeepAlive on connection between layer and agent (#104) * Add KeepAlive --- CHANGELOG.md | 1 + mirrord-agent/src/main.rs | 8 ++++++-- mirrord-layer/src/lib.rs | 24 ++++++++++++++++++++++-- mirrord-protocol/src/codec.rs | 2 ++ 4 files changed, 31 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6edae71b76e..bba35f3da8a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ Check [Keep a Changelog](http://keepachangelog.com/) for recommendations on how ### Added - Prompt user to update if their version is outdated in the VS Code extension or CLI. - Set `MIRRORD_CHECK_VERSION` to false to make E2E tests not read update messages. +- Add a keep-alive to keep the agent-pod from exiting, closes [#63](https://github.com/metalbear-co/mirrord/issues/63) ## 2.0.0 diff --git a/mirrord-agent/src/main.rs b/mirrord-agent/src/main.rs index 78a7db6034f..f9a7fc5615d 100644 --- a/mirrord-agent/src/main.rs +++ b/mirrord-agent/src/main.rs @@ -14,7 +14,7 @@ use tokio::{ sync::mpsc::{self}, }; use tokio_stream::StreamExt; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, trace}; use tracing_subscriber::prelude::*; mod cli; @@ -196,7 +196,11 @@ async fn start() -> Result<()> { ClientMessage::ConnectionUnsubscribe(connection_id) => { state.connections_subscriptions.unsubscribe(message.peer_id, connection_id); } - + ClientMessage::Ping => { + trace!("peer id {:?} sent ping", &message.peer_id); + let peer = state.peers.get(&message.peer_id).unwrap(); + peer.channel.send(DaemonMessage::Pong).await?; + } } }, diff --git a/mirrord-layer/src/lib.rs b/mirrord-layer/src/lib.rs index d9f385e863b..69b55ad974f 100644 --- a/mirrord-layer/src/lib.rs +++ b/mirrord-layer/src/lib.rs @@ -19,8 +19,9 @@ use tokio::{ select, sync::mpsc::{channel, Receiver, Sender}, task, + time::{sleep, Duration}, }; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, trace}; mod common; mod config; @@ -150,6 +151,7 @@ async fn handle_daemon_message( daemon_message: DaemonMessage, port_mapping: &mut HashMap, active_connections: &mut HashMap>, + ping: &mut bool, ) { match daemon_message { DaemonMessage::NewTCPConnection(conn) => { @@ -207,6 +209,14 @@ async fn handle_daemon_message( active_connections.remove(&msg.connection_id); } } + DaemonMessage::Pong => { + if *ping { + *ping = false; + trace!("Daemon sent pong!"); + } else { + panic!("Daemon: unmatched pong!"); + } + } DaemonMessage::Close => todo!(), DaemonMessage::LogMessage(_) => todo!(), } @@ -220,13 +230,23 @@ async fn poll_agent(mut pf: Portforwarder, mut receiver: Receiver) let mut codec = actix_codec::Framed::new(port, ClientCodec::new()); let mut port_mapping: HashMap = HashMap::new(); let mut active_connections = HashMap::new(); + let mut ping = false; loop { select! { hook_message = receiver.recv() => { handle_hook_message(hook_message.unwrap(), &mut port_mapping, &mut codec).await; } daemon_message = codec.next() => { - handle_daemon_message(daemon_message.unwrap().unwrap(), &mut port_mapping, &mut active_connections).await; + handle_daemon_message(daemon_message.unwrap().unwrap(), &mut port_mapping, &mut active_connections, &mut ping).await; + } + _ = sleep(Duration::from_secs(60)) => { + if !ping { + codec.send(ClientMessage::Ping).await.unwrap(); + trace!("sent ping to daemon"); + ping = true; + } else { + panic!("Client: unmatched ping"); + } } } } diff --git a/mirrord-protocol/src/codec.rs b/mirrord-protocol/src/codec.rs index 9206d8178d4..e6fcf5efdaa 100644 --- a/mirrord-protocol/src/codec.rs +++ b/mirrord-protocol/src/codec.rs @@ -36,6 +36,7 @@ pub enum ClientMessage { PortSubscribe(Vec), Close, ConnectionUnsubscribe(ConnectionID), + Ping, } #[derive(Encode, Decode, Debug, PartialEq, Eq, Clone)] @@ -45,6 +46,7 @@ pub enum DaemonMessage { TCPData(TCPData), TCPClose(TCPClose), LogMessage(LogMessage), + Pong, } pub struct ClientCodec {