diff --git a/Cargo.lock b/Cargo.lock index 1840af8..175d7a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -256,6 +256,7 @@ dependencies = [ "serde", "serde_json", "serde_yaml", + "socket2", "thiserror", "which", ] @@ -415,6 +416,16 @@ dependencies = [ "unsafe-libyaml", ] +[[package]] +name = "socket2" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9" +dependencies = [ + "libc", + "windows-sys 0.48.0", +] + [[package]] name = "strsim" version = "0.10.0" diff --git a/Cargo.toml b/Cargo.toml index e548b51..bd9a1af 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ semver = { version = "1.0.20", features = ["serde"] } serde = { version = "1.0.193", features = ["derive"] } serde_json = "1.0.108" serde_yaml = "0.9.29" +socket2 = "0.5.5" thiserror = "1.0.52" which = "4.4.2" diff --git a/src/kubectl.rs b/src/kubectl.rs index 935b3fc..2b69ecf 100644 --- a/src/kubectl.rs +++ b/src/kubectl.rs @@ -5,12 +5,15 @@ use crate::cli::KubectlPathBuf; use crate::config::{ConfigId, OperationalConfig, PortForwardConfig, RetryDelay}; use serde::Deserialize; +use socket2::{Domain, Protocol, SockAddr, Socket, Type}; use std::env::current_dir; use std::io::{BufRead, Read}; +use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs}; use std::path::{Path, PathBuf}; use std::process::{Command, ExitStatus, Stdio}; use std::sync::mpsc::Sender; use std::thread::JoinHandle; +use std::time::Duration; use std::{io, process, thread}; #[cfg(not(windows))] @@ -243,6 +246,44 @@ impl Kubectl { StreamSource::StdErr, ); + // TODO: Add TCP keepalive for each port! + let port = fwd_config.ports[0]; + let keepalive = thread::spawn(move || { + // TODO: Use fwd_config.listen_addrs to bind. + let port = port.local.unwrap_or(port.remote); + let mut addrs = format!("127.0.0.1:{port}") + .to_socket_addrs() + .expect("Failed to parse socket addresses"); + let addr = addrs.next().expect("Failed to obtain socket address"); + let addr = SockAddr::from(addr); + let stream = match Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP)) + { + Ok(socket) => { + socket.set_nodelay(true).expect("Failed to set TCP_NODELAY"); + socket + .set_keepalive(true) + .expect("Failed to set SO_KEEPALIVE"); + // TODO: stream.set_tcp_keepalive() ? + socket + .connect(&addr) + .expect("Failed to connect to socket address"); + TcpStream::from(socket) + } + Err(_e) => { + return; + } + }; + + // TODO: Do something with the stream ... or not. + loop { + if let Ok(Some(e)) = stream.take_error() { + eprintln!("Error on TCP keepalive stream: {e}"); + return; + } + thread::sleep(Duration::from_secs(10)); + } + }); + let mut child = ChildGuard(child); // Wait for the child process to finish diff --git a/src/main.rs b/src/main.rs index 42d546c..3291658 100644 --- a/src/main.rs +++ b/src/main.rs @@ -238,8 +238,8 @@ fn start_output_loop_thread(out_rx: Receiver) -> JoinHandle<()> { ChildEvent::Output(id, channel, message) => { // TODO: use display name match channel { - StreamSource::StdOut => println!("{id}: {message}"), - StreamSource::StdErr => eprintln!("{id}: {message}"), + StreamSource::StdOut => println!("{id}: kubectl: {message}"), + StreamSource::StdErr => eprintln!("{id}: kubectl: {message}"), } } ChildEvent::Exit(id, status, policy) => {