-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor Server customize shutdown and signals to support embedding #493
Open
kriswuollett
wants to merge
2
commits into
cloudflare:main
Choose a base branch
from
kriswuollett:signal-handling-fn
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ mod daemon; | |
#[cfg(unix)] | ||
pub(crate) mod transfer_fd; | ||
|
||
use async_trait::async_trait; | ||
#[cfg(unix)] | ||
use daemon::daemonize; | ||
use log::{debug, error, info, warn}; | ||
|
@@ -59,6 +60,61 @@ pub type ShutdownWatch = watch::Receiver<bool>; | |
#[cfg(unix)] | ||
pub type ListenFds = Arc<Mutex<Fds>>; | ||
|
||
pub enum ShutdownSignal { | ||
GracefulUpgrade, | ||
GracefulTerminate, | ||
FastShutdown, | ||
} | ||
|
||
#[async_trait] | ||
pub trait ShutdownSignalWatch { | ||
async fn recv(&self) -> ShutdownSignal; | ||
} | ||
|
||
#[cfg(unix)] | ||
pub struct UnixShutdownSignalWatch; | ||
|
||
#[cfg(unix)] | ||
#[async_trait] | ||
impl ShutdownSignalWatch for UnixShutdownSignalWatch { | ||
async fn recv(&self) -> ShutdownSignal { | ||
let mut graceful_upgrade_signal = unix::signal(unix::SignalKind::quit()).unwrap(); | ||
let mut graceful_terminate_signal = unix::signal(unix::SignalKind::terminate()).unwrap(); | ||
let mut fast_shutdown_signal = unix::signal(unix::SignalKind::interrupt()).unwrap(); | ||
|
||
tokio::select! { | ||
_ = graceful_upgrade_signal.recv() => { | ||
ShutdownSignal::GracefulUpgrade | ||
}, | ||
_ = graceful_terminate_signal.recv() => { | ||
ShutdownSignal::GracefulTerminate | ||
}, | ||
_ = fast_shutdown_signal.recv() => { | ||
ShutdownSignal::FastShutdown | ||
}, | ||
} | ||
} | ||
} | ||
|
||
pub struct RunArgs { | ||
#[cfg(unix)] | ||
pub shutdown_signal: Box<dyn ShutdownSignalWatch>, | ||
} | ||
|
||
impl Default for RunArgs { | ||
#[cfg(unix)] | ||
fn default() -> Self { | ||
Self { | ||
shutdown_signal: Box::new(UnixShutdownSignalWatch), | ||
} | ||
} | ||
|
||
#[cfg(windows)] | ||
fn default() -> Self { | ||
Self {} | ||
} | ||
} | ||
|
||
/// The server object | ||
/// | ||
/// This object represents an entire pingora server process which may have multiple independent | ||
|
@@ -87,43 +143,41 @@ pub struct Server { | |
|
||
impl Server { | ||
#[cfg(unix)] | ||
async fn main_loop(&self) -> ShutdownType { | ||
async fn main_loop(&self, run_args: RunArgs) -> ShutdownType { | ||
// waiting for exit signal | ||
// TODO: there should be a signal handling function | ||
let mut graceful_upgrade_signal = unix::signal(unix::SignalKind::quit()).unwrap(); | ||
let mut graceful_terminate_signal = unix::signal(unix::SignalKind::terminate()).unwrap(); | ||
let mut fast_shutdown_signal = unix::signal(unix::SignalKind::interrupt()).unwrap(); | ||
tokio::select! { | ||
_ = fast_shutdown_signal.recv() => { | ||
match run_args.shutdown_signal.recv().await { | ||
ShutdownSignal::FastShutdown => { | ||
info!("SIGINT received, exiting"); | ||
ShutdownType::Quick | ||
}, | ||
_ = graceful_terminate_signal.recv() => { | ||
} | ||
ShutdownSignal::GracefulTerminate => { | ||
// we receive a graceful terminate, all instances are instructed to stop | ||
info!("SIGTERM received, gracefully exiting"); | ||
// graceful shutdown if there are listening sockets | ||
info!("Broadcasting graceful shutdown"); | ||
match self.shutdown_watch.send(true) { | ||
Ok(_) => { info!("Graceful shutdown started!"); } | ||
Ok(_) => { | ||
info!("Graceful shutdown started!"); | ||
} | ||
Err(e) => { | ||
error!("Graceful shutdown broadcast failed: {e}"); | ||
} | ||
} | ||
info!("Broadcast graceful shutdown complete"); | ||
ShutdownType::Graceful | ||
} | ||
_ = graceful_upgrade_signal.recv() => { | ||
ShutdownSignal::GracefulUpgrade => { | ||
// TODO: still need to select! on signals in case a fast shutdown is needed | ||
// aka: move below to another task and only kick it off here | ||
info!("SIGQUIT received, sending socks and gracefully exiting"); | ||
if let Some(fds) = &self.listen_fds { | ||
let fds = fds.lock().await; | ||
info!("Trying to send socks"); | ||
// XXX: this is blocking IO | ||
match fds.send_to_sock( | ||
self.configuration.as_ref().upgrade_sock.as_str()) | ||
{ | ||
Ok(_) => {info!("listener sockets sent");}, | ||
match fds.send_to_sock(self.configuration.as_ref().upgrade_sock.as_str()) { | ||
Ok(_) => { | ||
info!("listener sockets sent"); | ||
} | ||
Err(e) => { | ||
error!("Unable to send listener sockets to new process: {e}"); | ||
// sentry log error on fd send failure | ||
|
@@ -135,7 +189,9 @@ impl Server { | |
info!("Broadcasting graceful shutdown"); | ||
// gracefully exiting | ||
match self.shutdown_watch.send(true) { | ||
Ok(_) => { info!("Graceful shutdown started!"); } | ||
Ok(_) => { | ||
info!("Graceful shutdown started!"); | ||
} | ||
Err(e) => { | ||
error!("Graceful shutdown broadcast failed: {e}"); | ||
// switch to fast shutdown | ||
|
@@ -148,7 +204,7 @@ impl Server { | |
info!("No socks to send, shutting down."); | ||
ShutdownType::Graceful | ||
} | ||
}, | ||
} | ||
} | ||
} | ||
|
||
|
@@ -306,14 +362,25 @@ impl Server { | |
} | ||
} | ||
|
||
/// Start the server using [Self::run] and default [RunArgs]. | ||
#[allow(unused_mut)] // TODO: May not need to keep mut self in interface | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's remove the |
||
pub fn run_forever(mut self) -> ! { | ||
info!("Server starting"); | ||
|
||
self.run(RunArgs::default()); | ||
|
||
info!("All runtimes exited, exiting now"); | ||
std::process::exit(0) | ||
} | ||
|
||
/// Start the server | ||
/// | ||
/// This function will block forever until the server needs to quit. So this would be the last | ||
/// function to call for this object. | ||
/// | ||
/// Note: this function may fork the process for daemonization, so any additional threads created | ||
/// before this function will be lost to any service logic once this function is called. | ||
pub fn run_forever(mut self) -> ! { | ||
pub fn run(mut self, run_args: RunArgs) { | ||
info!("Server starting"); | ||
|
||
let conf = self.configuration.as_ref(); | ||
|
@@ -354,7 +421,9 @@ impl Server { | |
// Only work steal runtime can use block_on() | ||
let server_runtime = Server::create_runtime("Server", 1, true); | ||
#[cfg(unix)] | ||
let shutdown_type = server_runtime.get_handle().block_on(self.main_loop()); | ||
let shutdown_type = server_runtime | ||
.get_handle() | ||
.block_on(self.main_loop(run_args)); | ||
#[cfg(windows)] | ||
let shutdown_type = ShutdownType::Graceful; | ||
|
||
|
@@ -394,8 +463,6 @@ impl Server { | |
error!("Failed to shutdown runtime: {:?}", e); | ||
} | ||
} | ||
info!("All runtimes exited, exiting now"); | ||
std::process::exit(0) | ||
} | ||
|
||
fn create_runtime(name: &str, threads: usize, work_steal: bool) -> Runtime { | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add doc comments for these public types.