Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Server customize shutdown and signals to support embedding #493

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 88 additions & 21 deletions pingora-core/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod daemon;
#[cfg(unix)]
pub(crate) mod transfer_fd;

use async_trait::async_trait;
#[cfg(unix)]
use daemon::daemonize;
use log::{debug, error, info, warn};
Expand Down Expand Up @@ -59,6 +60,61 @@ pub type ShutdownWatch = watch::Receiver<bool>;
#[cfg(unix)]
pub type ListenFds = Arc<Mutex<Fds>>;

pub enum ShutdownSignal {
GracefulUpgrade,
GracefulTerminate,
FastShutdown,
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add doc comments for these public types.

#[async_trait]
pub trait ShutdownSignalWatch {
async fn recv(&self) -> ShutdownSignal;
}

#[cfg(unix)]
pub struct UnixShutdownSignalWatch;

#[cfg(unix)]
#[async_trait]
impl ShutdownSignalWatch for UnixShutdownSignalWatch {
async fn recv(&self) -> ShutdownSignal {
let mut graceful_upgrade_signal = unix::signal(unix::SignalKind::quit()).unwrap();
let mut graceful_terminate_signal = unix::signal(unix::SignalKind::terminate()).unwrap();
let mut fast_shutdown_signal = unix::signal(unix::SignalKind::interrupt()).unwrap();

tokio::select! {
_ = graceful_upgrade_signal.recv() => {
ShutdownSignal::GracefulUpgrade
},
_ = graceful_terminate_signal.recv() => {
ShutdownSignal::GracefulTerminate
},
_ = fast_shutdown_signal.recv() => {
ShutdownSignal::FastShutdown
},
}
}
}

pub struct RunArgs {
#[cfg(unix)]
pub shutdown_signal: Box<dyn ShutdownSignalWatch>,
}

impl Default for RunArgs {
#[cfg(unix)]
fn default() -> Self {
Self {
shutdown_signal: Box::new(UnixShutdownSignalWatch),
}
}

#[cfg(windows)]
fn default() -> Self {
Self {}
}
}

/// The server object
///
/// This object represents an entire pingora server process which may have multiple independent
Expand Down Expand Up @@ -87,43 +143,41 @@ pub struct Server {

impl Server {
#[cfg(unix)]
async fn main_loop(&self) -> ShutdownType {
async fn main_loop(&self, run_args: RunArgs) -> ShutdownType {
// waiting for exit signal
// TODO: there should be a signal handling function
let mut graceful_upgrade_signal = unix::signal(unix::SignalKind::quit()).unwrap();
let mut graceful_terminate_signal = unix::signal(unix::SignalKind::terminate()).unwrap();
let mut fast_shutdown_signal = unix::signal(unix::SignalKind::interrupt()).unwrap();
tokio::select! {
_ = fast_shutdown_signal.recv() => {
match run_args.shutdown_signal.recv().await {
ShutdownSignal::FastShutdown => {
info!("SIGINT received, exiting");
ShutdownType::Quick
},
_ = graceful_terminate_signal.recv() => {
}
ShutdownSignal::GracefulTerminate => {
// we receive a graceful terminate, all instances are instructed to stop
info!("SIGTERM received, gracefully exiting");
// graceful shutdown if there are listening sockets
info!("Broadcasting graceful shutdown");
match self.shutdown_watch.send(true) {
Ok(_) => { info!("Graceful shutdown started!"); }
Ok(_) => {
info!("Graceful shutdown started!");
}
Err(e) => {
error!("Graceful shutdown broadcast failed: {e}");
}
}
info!("Broadcast graceful shutdown complete");
ShutdownType::Graceful
}
_ = graceful_upgrade_signal.recv() => {
ShutdownSignal::GracefulUpgrade => {
// TODO: still need to select! on signals in case a fast shutdown is needed
// aka: move below to another task and only kick it off here
info!("SIGQUIT received, sending socks and gracefully exiting");
if let Some(fds) = &self.listen_fds {
let fds = fds.lock().await;
info!("Trying to send socks");
// XXX: this is blocking IO
match fds.send_to_sock(
self.configuration.as_ref().upgrade_sock.as_str())
{
Ok(_) => {info!("listener sockets sent");},
match fds.send_to_sock(self.configuration.as_ref().upgrade_sock.as_str()) {
Ok(_) => {
info!("listener sockets sent");
}
Err(e) => {
error!("Unable to send listener sockets to new process: {e}");
// sentry log error on fd send failure
Expand All @@ -135,7 +189,9 @@ impl Server {
info!("Broadcasting graceful shutdown");
// gracefully exiting
match self.shutdown_watch.send(true) {
Ok(_) => { info!("Graceful shutdown started!"); }
Ok(_) => {
info!("Graceful shutdown started!");
}
Err(e) => {
error!("Graceful shutdown broadcast failed: {e}");
// switch to fast shutdown
Expand All @@ -148,7 +204,7 @@ impl Server {
info!("No socks to send, shutting down.");
ShutdownType::Graceful
}
},
}
}
}

Expand Down Expand Up @@ -306,14 +362,25 @@ impl Server {
}
}

/// Start the server using [Self::run] and default [RunArgs].
#[allow(unused_mut)] // TODO: May not need to keep mut self in interface
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove the mut.

pub fn run_forever(mut self) -> ! {
info!("Server starting");

self.run(RunArgs::default());

info!("All runtimes exited, exiting now");
std::process::exit(0)
}

/// Start the server
///
/// This function will block forever until the server needs to quit. So this would be the last
/// function to call for this object.
///
/// Note: this function may fork the process for daemonization, so any additional threads created
/// before this function will be lost to any service logic once this function is called.
pub fn run_forever(mut self) -> ! {
pub fn run(mut self, run_args: RunArgs) {
info!("Server starting");

let conf = self.configuration.as_ref();
Expand Down Expand Up @@ -354,7 +421,9 @@ impl Server {
// Only work steal runtime can use block_on()
let server_runtime = Server::create_runtime("Server", 1, true);
#[cfg(unix)]
let shutdown_type = server_runtime.get_handle().block_on(self.main_loop());
let shutdown_type = server_runtime
.get_handle()
.block_on(self.main_loop(run_args));
#[cfg(windows)]
let shutdown_type = ShutdownType::Graceful;

Expand Down Expand Up @@ -394,8 +463,6 @@ impl Server {
error!("Failed to shutdown runtime: {:?}", e);
}
}
info!("All runtimes exited, exiting now");
std::process::exit(0)
}

fn create_runtime(name: &str, threads: usize, work_steal: bool) -> Runtime {
Expand Down
Loading