Skip to content

Commit

Permalink
Refactor Server run to support customizing shutdown signals and behav…
Browse files Browse the repository at this point in the history
…ior.

This change supports building a server that can run Pingora without it
calling std::process::exit during shutdown.
  • Loading branch information
kriswuollett committed Dec 13, 2024
1 parent e309436 commit 0ae925f
Showing 1 changed file with 88 additions and 21 deletions.
109 changes: 88 additions & 21 deletions pingora-core/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod daemon;
#[cfg(unix)]
pub(crate) mod transfer_fd;

use async_trait::async_trait;
#[cfg(unix)]
use daemon::daemonize;
use log::{debug, error, info, warn};
Expand Down Expand Up @@ -59,6 +60,61 @@ pub type ShutdownWatch = watch::Receiver<bool>;
#[cfg(unix)]
pub type ListenFds = Arc<Mutex<Fds>>;

pub enum ShutdownSignal {
GracefulUpgrade,
GracefulTerminate,
FastShutdown,
}

#[async_trait]
pub trait ShutdownSignalWatch {
async fn recv(&self) -> ShutdownSignal;
}

#[cfg(unix)]
pub struct UnixShutdownSignalWatch;

#[cfg(unix)]
#[async_trait]
impl ShutdownSignalWatch for UnixShutdownSignalWatch {
async fn recv(&self) -> ShutdownSignal {
let mut graceful_upgrade_signal = unix::signal(unix::SignalKind::quit()).unwrap();
let mut graceful_terminate_signal = unix::signal(unix::SignalKind::terminate()).unwrap();
let mut fast_shutdown_signal = unix::signal(unix::SignalKind::interrupt()).unwrap();

tokio::select! {
_ = graceful_upgrade_signal.recv() => {
ShutdownSignal::GracefulUpgrade
},
_ = graceful_terminate_signal.recv() => {
ShutdownSignal::GracefulTerminate
},
_ = fast_shutdown_signal.recv() => {
ShutdownSignal::FastShutdown
},
}
}
}

pub struct RunArgs {
#[cfg(unix)]
pub shutdown_signal: Box<dyn ShutdownSignalWatch>,
}

impl Default for RunArgs {
#[cfg(unix)]
fn default() -> Self {
Self {
shutdown_signal: Box::new(UnixShutdownSignalWatch),
}
}

#[cfg(windows)]
fn default() -> Self {
Self {}
}
}

/// The server object
///
/// This object represents an entire pingora server process which may have multiple independent
Expand Down Expand Up @@ -87,43 +143,41 @@ pub struct Server {

impl Server {
#[cfg(unix)]
async fn main_loop(&self) -> ShutdownType {
async fn main_loop(&self, run_args: RunArgs) -> ShutdownType {
// waiting for exit signal
// TODO: there should be a signal handling function
let mut graceful_upgrade_signal = unix::signal(unix::SignalKind::quit()).unwrap();
let mut graceful_terminate_signal = unix::signal(unix::SignalKind::terminate()).unwrap();
let mut fast_shutdown_signal = unix::signal(unix::SignalKind::interrupt()).unwrap();
tokio::select! {
_ = fast_shutdown_signal.recv() => {
match run_args.shutdown_signal.recv().await {
ShutdownSignal::FastShutdown => {
info!("SIGINT received, exiting");
ShutdownType::Quick
},
_ = graceful_terminate_signal.recv() => {
}
ShutdownSignal::GracefulUpgrade => {
// we receive a graceful terminate, all instances are instructed to stop
info!("SIGTERM received, gracefully exiting");
// graceful shutdown if there are listening sockets
info!("Broadcasting graceful shutdown");
match self.shutdown_watch.send(true) {
Ok(_) => { info!("Graceful shutdown started!"); }
Ok(_) => {
info!("Graceful shutdown started!");
}
Err(e) => {
error!("Graceful shutdown broadcast failed: {e}");
}
}
info!("Broadcast graceful shutdown complete");
ShutdownType::Graceful
}
_ = graceful_upgrade_signal.recv() => {
ShutdownSignal::GracefulTerminate => {
// TODO: still need to select! on signals in case a fast shutdown is needed
// aka: move below to another task and only kick it off here
info!("SIGQUIT received, sending socks and gracefully exiting");
if let Some(fds) = &self.listen_fds {
let fds = fds.lock().await;
info!("Trying to send socks");
// XXX: this is blocking IO
match fds.send_to_sock(
self.configuration.as_ref().upgrade_sock.as_str())
{
Ok(_) => {info!("listener sockets sent");},
match fds.send_to_sock(self.configuration.as_ref().upgrade_sock.as_str()) {
Ok(_) => {
info!("listener sockets sent");
}
Err(e) => {
error!("Unable to send listener sockets to new process: {e}");
// sentry log error on fd send failure
Expand All @@ -135,7 +189,9 @@ impl Server {
info!("Broadcasting graceful shutdown");
// gracefully exiting
match self.shutdown_watch.send(true) {
Ok(_) => { info!("Graceful shutdown started!"); }
Ok(_) => {
info!("Graceful shutdown started!");
}
Err(e) => {
error!("Graceful shutdown broadcast failed: {e}");
// switch to fast shutdown
Expand All @@ -148,7 +204,7 @@ impl Server {
info!("No socks to send, shutting down.");
ShutdownType::Graceful
}
},
}
}
}

Expand Down Expand Up @@ -306,14 +362,25 @@ impl Server {
}
}

/// Start the server using [Self::run] and default [RunArgs].
#[allow(unused_mut)] // TODO: May not need to keep mut self in interface
pub fn run_forever(mut self) -> ! {
info!("Server starting");

self.run(RunArgs::default());

info!("All runtimes exited, exiting now");
std::process::exit(0)
}

/// Start the server
///
/// This function will block forever until the server needs to quit. So this would be the last
/// function to call for this object.
///
/// Note: this function may fork the process for daemonization, so any additional threads created
/// before this function will be lost to any service logic once this function is called.
pub fn run_forever(mut self) -> ! {
pub fn run(mut self, run_args: RunArgs) {
info!("Server starting");

let conf = self.configuration.as_ref();
Expand Down Expand Up @@ -354,7 +421,9 @@ impl Server {
// Only work steal runtime can use block_on()
let server_runtime = Server::create_runtime("Server", 1, true);
#[cfg(unix)]
let shutdown_type = server_runtime.get_handle().block_on(self.main_loop());
let shutdown_type = server_runtime
.get_handle()
.block_on(self.main_loop(run_args));
#[cfg(windows)]
let shutdown_type = ShutdownType::Graceful;

Expand Down Expand Up @@ -394,8 +463,6 @@ impl Server {
error!("Failed to shutdown runtime: {:?}", e);
}
}
info!("All runtimes exited, exiting now");
std::process::exit(0)
}

fn create_runtime(name: &str, threads: usize, work_steal: bool) -> Runtime {
Expand Down

0 comments on commit 0ae925f

Please sign in to comment.