From 169149e4eed96fa6c76c448dc3e6f72d828e52b9 Mon Sep 17 00:00:00 2001 From: gengteng Date: Sat, 23 Mar 2024 18:58:20 +0800 Subject: [PATCH 1/2] feat: Implement a shutdown signal listener function and listen for rapid shutdown signals during the graceful upgrade process. --- pingora-core/src/server/mod.rs | 139 ++++++++++++++++++++++----------- 1 file changed, 94 insertions(+), 45 deletions(-) diff --git a/pingora-core/src/server/mod.rs b/pingora-core/src/server/mod.rs index 9c29aafe3..ebb590c50 100644 --- a/pingora-core/src/server/mod.rs +++ b/pingora-core/src/server/mod.rs @@ -77,22 +77,21 @@ pub struct Server { impl Server { async fn main_loop(&self) -> ShutdownType { // waiting for exit signal - // TODO: there should be a signal handling function - let mut graceful_upgrade_signal = unix::signal(unix::SignalKind::quit()).unwrap(); - let mut graceful_terminate_signal = unix::signal(unix::SignalKind::terminate()).unwrap(); - let mut fast_shutdown_signal = unix::signal(unix::SignalKind::interrupt()).unwrap(); - tokio::select! { - _ = fast_shutdown_signal.recv() => { + let shutdown_signal = wait_for_shutdown_signal().await; + match shutdown_signal { + ShutdownSignal::Fast => { info!("SIGINT received, exiting"); ShutdownType::Quick - }, - _ = graceful_terminate_signal.recv() => { + } + ShutdownSignal::GracefulTerminate => { // we receive a graceful terminate, all instances are instructed to stop info!("SIGTERM received, gracefully exiting"); // graceful shutdown if there are listening sockets info!("Broadcasting graceful shutdown"); match self.shutdown_watch.send(true) { - Ok(_) => { info!("Graceful shutdown started!"); } + Ok(_) => { + info!("Graceful shutdown started!"); + } Err(e) => { error!("Graceful shutdown broadcast failed: {e}"); } @@ -100,43 +99,51 @@ impl Server { info!("Broadcast graceful shutdown complete"); ShutdownType::Graceful } - _ = graceful_upgrade_signal.recv() => { - // TODO: still need to select! on signals in case a fast shutdown is needed - // aka: move below to another task and only kick it off here - info!("SIGQUIT received, sending socks and gracefully exiting"); - if let Some(fds) = &self.listen_fds { - let fds = fds.lock().await; - info!("Trying to send socks"); - // XXX: this is blocking IO - match fds.send_to_sock( - self.configuration.as_ref().upgrade_sock.as_str()) - { - Ok(_) => {info!("listener sockets sent");}, - Err(e) => { - error!("Unable to send listener sockets to new process: {e}"); - // sentry log error on fd send failure - #[cfg(not(debug_assertions))] - sentry::capture_error(&e); - } - } - sleep(Duration::from_secs(CLOSE_TIMEOUT)).await; - info!("Broadcasting graceful shutdown"); - // gracefully exiting - match self.shutdown_watch.send(true) { - Ok(_) => { info!("Graceful shutdown started!"); } - Err(e) => { - error!("Graceful shutdown broadcast failed: {e}"); - // switch to fast shutdown - return ShutdownType::Graceful; - } - } - info!("Broadcast graceful shutdown complete"); - ShutdownType::Graceful - } else { - info!("No socks to send, shutting down."); - ShutdownType::Graceful + ShutdownSignal::GracefulUpgrade => { + let mut wait_for_sig_int = unix::signal(unix::SignalKind::interrupt()) + .expect("Failed to create SIGINT listener."); + tokio::select! { + _ = wait_for_sig_int.recv() => {} + _ = self.graceful_upgrade() => {} } - }, + ShutdownType::Graceful + } + } + } + + async fn graceful_upgrade(&self) { + // aka: move below to another task and only kick it off here + info!("SIGQUIT received, sending socks and gracefully exiting"); + if let Some(fds) = &self.listen_fds { + let fds = fds.lock().await; + info!("Trying to send socks"); + // XXX: this is blocking IO + match fds.send_to_sock(self.configuration.as_ref().upgrade_sock.as_str()) { + Ok(_) => { + info!("listener sockets sent"); + } + Err(e) => { + error!("Unable to send listener sockets to new process: {e}"); + // sentry log error on fd send failure + #[cfg(not(debug_assertions))] + sentry::capture_error(&e); + } + } + sleep(Duration::from_secs(CLOSE_TIMEOUT)).await; + info!("Broadcasting graceful shutdown"); + // gracefully exiting + match self.shutdown_watch.send(true) { + Ok(_) => { + info!("Graceful shutdown started!"); + } + Err(e) => { + error!("Graceful shutdown broadcast failed: {e}"); + // switch to fast shutdown + } + } + info!("Broadcast graceful shutdown complete"); + } else { + info!("No socks to send, shutting down."); } } @@ -339,3 +346,45 @@ impl Server { } } } + +enum ShutdownSignal { + Fast, + GracefulTerminate, + GracefulUpgrade, +} + +async fn wait_for_shutdown_signal() -> ShutdownSignal { + let sig_int = async { + tokio::signal::ctrl_c() + .await + .expect("Failed to create SIGINT listener."); + }; + + #[cfg(unix)] + let sig_term = async { + unix::signal(unix::SignalKind::terminate()) + .expect("Failed to create SIGTERM listener.") + .recv() + .await; + }; + + #[cfg(unix)] + let sig_quit = async { + unix::signal(unix::SignalKind::quit()) + .expect("Failed to create SIGTERM listener.") + .recv() + .await; + }; + + #[cfg(not(unix))] + let sig_term = std::future::pending::<()>(); + + #[cfg(not(unix))] + let sig_quit = std::future::pending::<()>(); + + tokio::select! { + _ = sig_int => ShutdownSignal::Fast, + _ = sig_term => ShutdownSignal::GracefulTerminate, + _ = sig_quit => ShutdownSignal::GracefulUpgrade, + } +} From b6cc97e86037e7aea36538413d6c08b8414fdf05 Mon Sep 17 00:00:00 2001 From: GengTeng Date: Tue, 26 Mar 2024 11:56:26 +0800 Subject: [PATCH 2/2] Update mod.rs --- pingora-core/src/server/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pingora-core/src/server/mod.rs b/pingora-core/src/server/mod.rs index ebb590c50..58e984d8c 100644 --- a/pingora-core/src/server/mod.rs +++ b/pingora-core/src/server/mod.rs @@ -371,7 +371,7 @@ async fn wait_for_shutdown_signal() -> ShutdownSignal { #[cfg(unix)] let sig_quit = async { unix::signal(unix::SignalKind::quit()) - .expect("Failed to create SIGTERM listener.") + .expect("Failed to create SIGQUIT listener.") .recv() .await; };