Skip to content

Commit

Permalink
Add a KeepAlive on connection between layer and agent (#104)
Browse files Browse the repository at this point in the history
* Add KeepAlive
  • Loading branch information
infiniteregrets authored Jun 3, 2022
1 parent 81a66f9 commit 26c0dc7
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Check [Keep a Changelog](http://keepachangelog.com/) for recommendations on how
### Added
- Prompt user to update if their version is outdated in the VS Code extension or CLI.
- Set `MIRRORD_CHECK_VERSION` to false to make E2E tests not read update messages.
- Add a keep-alive to keep the agent-pod from exiting, closes [#63](https://github.com/metalbear-co/mirrord/issues/63)

## 2.0.0

Expand Down
8 changes: 6 additions & 2 deletions mirrord-agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio::{
sync::mpsc::{self},
};
use tokio_stream::StreamExt;
use tracing::{debug, error, info};
use tracing::{debug, error, info, trace};
use tracing_subscriber::prelude::*;

mod cli;
Expand Down Expand Up @@ -196,7 +196,11 @@ async fn start() -> Result<()> {
ClientMessage::ConnectionUnsubscribe(connection_id) => {
state.connections_subscriptions.unsubscribe(message.peer_id, connection_id);
}

ClientMessage::Ping => {
trace!("peer id {:?} sent ping", &message.peer_id);
let peer = state.peers.get(&message.peer_id).unwrap();
peer.channel.send(DaemonMessage::Pong).await?;
}

}
},
Expand Down
24 changes: 22 additions & 2 deletions mirrord-layer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ use tokio::{
select,
sync::mpsc::{channel, Receiver, Sender},
task,
time::{sleep, Duration},
};
use tracing::{debug, error, info};
use tracing::{debug, error, info, trace};

mod common;
mod config;
Expand Down Expand Up @@ -150,6 +151,7 @@ async fn handle_daemon_message(
daemon_message: DaemonMessage,
port_mapping: &mut HashMap<Port, ListenData>,
active_connections: &mut HashMap<u16, Sender<TcpTunnelMessages>>,
ping: &mut bool,
) {
match daemon_message {
DaemonMessage::NewTCPConnection(conn) => {
Expand Down Expand Up @@ -207,6 +209,14 @@ async fn handle_daemon_message(
active_connections.remove(&msg.connection_id);
}
}
DaemonMessage::Pong => {
if *ping {
*ping = false;
trace!("Daemon sent pong!");
} else {
panic!("Daemon: unmatched pong!");
}
}
DaemonMessage::Close => todo!(),
DaemonMessage::LogMessage(_) => todo!(),
}
Expand All @@ -220,13 +230,23 @@ async fn poll_agent(mut pf: Portforwarder, mut receiver: Receiver<HookMessage>)
let mut codec = actix_codec::Framed::new(port, ClientCodec::new());
let mut port_mapping: HashMap<Port, ListenData> = HashMap::new();
let mut active_connections = HashMap::new();
let mut ping = false;
loop {
select! {
hook_message = receiver.recv() => {
handle_hook_message(hook_message.unwrap(), &mut port_mapping, &mut codec).await;
}
daemon_message = codec.next() => {
handle_daemon_message(daemon_message.unwrap().unwrap(), &mut port_mapping, &mut active_connections).await;
handle_daemon_message(daemon_message.unwrap().unwrap(), &mut port_mapping, &mut active_connections, &mut ping).await;
}
_ = sleep(Duration::from_secs(60)) => {
if !ping {
codec.send(ClientMessage::Ping).await.unwrap();
trace!("sent ping to daemon");
ping = true;
} else {
panic!("Client: unmatched ping");
}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions mirrord-protocol/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub enum ClientMessage {
PortSubscribe(Vec<u16>),
Close,
ConnectionUnsubscribe(ConnectionID),
Ping,
}

#[derive(Encode, Decode, Debug, PartialEq, Eq, Clone)]
Expand All @@ -45,6 +46,7 @@ pub enum DaemonMessage {
TCPData(TCPData),
TCPClose(TCPClose),
LogMessage(LogMessage),
Pong,
}

pub struct ClientCodec {
Expand Down

0 comments on commit 26c0dc7

Please sign in to comment.