Skip to content

Commit

Permalink
Add TCP keepalive draft
Browse files Browse the repository at this point in the history
  • Loading branch information
sunsided committed Jan 6, 2024
1 parent 618205f commit ccf5041
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 2 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ semver = { version = "1.0.20", features = ["serde"] }
serde = { version = "1.0.193", features = ["derive"] }
serde_json = "1.0.108"
serde_yaml = "0.9.29"
socket2 = "0.5.5"
thiserror = "1.0.52"
which = "4.4.2"

Expand Down
41 changes: 41 additions & 0 deletions src/kubectl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@
use crate::cli::KubectlPathBuf;
use crate::config::{ConfigId, OperationalConfig, PortForwardConfig, RetryDelay};
use serde::Deserialize;
use socket2::{Domain, Protocol, SockAddr, Socket, Type};
use std::env::current_dir;
use std::io::{BufRead, Read};
use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs};
use std::path::{Path, PathBuf};
use std::process::{Command, ExitStatus, Stdio};
use std::sync::mpsc::Sender;
use std::thread::JoinHandle;
use std::time::Duration;
use std::{io, process, thread};

#[cfg(not(windows))]
Expand Down Expand Up @@ -243,6 +246,44 @@ impl Kubectl {
StreamSource::StdErr,
);

// TODO: Add TCP keepalive for each port!
let port = fwd_config.ports[0];
let keepalive = thread::spawn(move || {
// TODO: Use fwd_config.listen_addrs to bind.
let port = port.local.unwrap_or(port.remote);
let mut addrs = format!("127.0.0.1:{port}")
.to_socket_addrs()
.expect("Failed to parse socket addresses");
let addr = addrs.next().expect("Failed to obtain socket address");
let addr = SockAddr::from(addr);
let stream = match Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP))
{
Ok(socket) => {
socket.set_nodelay(true).expect("Failed to set TCP_NODELAY");
socket
.set_keepalive(true)
.expect("Failed to set SO_KEEPALIVE");
// TODO: stream.set_tcp_keepalive() ?
socket
.connect(&addr)
.expect("Failed to connect to socket address");
TcpStream::from(socket)
}
Err(_e) => {
return;
}
};

// TODO: Do something with the stream ... or not.
loop {
if let Ok(Some(e)) = stream.take_error() {
eprintln!("Error on TCP keepalive stream: {e}");
return;
}
thread::sleep(Duration::from_secs(10));
}
});

let mut child = ChildGuard(child);

// Wait for the child process to finish
Expand Down
4 changes: 2 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,8 @@ fn start_output_loop_thread(out_rx: Receiver<ChildEvent>) -> JoinHandle<()> {
ChildEvent::Output(id, channel, message) => {
// TODO: use display name
match channel {
StreamSource::StdOut => println!("{id}: {message}"),
StreamSource::StdErr => eprintln!("{id}: {message}"),
StreamSource::StdOut => println!("{id}: kubectl: {message}"),
StreamSource::StdErr => eprintln!("{id}: kubectl: {message}"),
}
}
ChildEvent::Exit(id, status, policy) => {
Expand Down

0 comments on commit ccf5041

Please sign in to comment.