Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Signal shutdown, use runtime shutdown to stop #753

Merged
merged 1 commit into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 3 additions & 9 deletions lading/src/bin/lading.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@ fn get_config(ops: &Opts) -> Config {
async fn inner_main(
experiment_duration: Duration,
warmup_duration: Duration,
max_shutdown_delay: Duration,
disable_inspector: bool,
config: Config,
) {
Expand Down Expand Up @@ -383,7 +382,7 @@ async fn inner_main(
shutdown.signal().unwrap();
},
_ = experiment_sleep => {
info!("experiment duration exceeded");
info!("experiment duration exceeded, signaling for shutdown");
shutdown.signal().unwrap();
}
res = tsrv => {
Expand All @@ -399,11 +398,7 @@ async fn inner_main(
}
}
}
info!(
"Waiting for {} seconds for tasks to shutdown.",
max_shutdown_delay.as_secs(),
);
shutdown.wait(max_shutdown_delay).await;
drop(shutdown);
}

fn run_process_tree(opts: ProcessTreeGen) {
Expand Down Expand Up @@ -466,7 +461,7 @@ fn main() {
let warmup_duration = Duration::from_secs(opts.warmup_duration_seconds.into());
// The maximum shutdown delay is shared between `inner_main` and this
// function, hence the divide by two.
let max_shutdown_delay = Duration::from_secs(opts.max_shutdown_delay.into()) / 2;
let max_shutdown_delay = Duration::from_secs(opts.max_shutdown_delay.into());
let disable_inspector = opts.disable_inspector;

let runtime = Builder::new_multi_thread()
Expand All @@ -477,7 +472,6 @@ fn main() {
runtime.block_on(inner_main(
experiment_duration,
warmup_duration,
max_shutdown_delay,
disable_inspector,
config,
));
Expand Down
48 changes: 1 addition & 47 deletions lading/src/signals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@

use std::sync::Arc;

use tokio::{
sync::broadcast,
time::{interval, Duration},
};
use tracing::{error, info};
use tokio::sync::broadcast;

#[derive(Debug)]
/// Errors produced by [`Shutdown`]
Expand Down Expand Up @@ -86,48 +82,6 @@ impl Shutdown {
pub fn signal(&self) -> Result<usize, Error> {
self.sender.send(()).map_err(Error::Tokio)
}

/// Wait for all `Shutdown` instances to properly shut down. This function
/// is safe to call from multiple instances of a `Shutdown`.
///
/// # Panics
///
/// None known.
pub async fn wait(self, max_delay: Duration) {
// Tidy up our own `notify`, avoiding a situation where we infinitely wait
// to shut down.
drop(self.notify);

let mut check_pulse = interval(Duration::from_secs(1));
let mut max_delay = interval(max_delay);
// Move past the first delay. If we fail to avoid this 0th interval the
// program shuts down with an error incorrectly.
max_delay.tick().await;

loop {
tokio::select! {
_ = check_pulse.tick() => {
let remaining: usize = self.sender.receiver_count();
if remaining == 0 {
info!("all tasks shut down");
return;
}
// For reasons that are obscure to me if we sleep here it's
// _possible_ for the runtime to fully lock up when the splunk_heck
// -- at least -- generator is running. See note below. This only
// seems to happen if we have a single-threaded runtime or a low
// number of worker threads available. I've reproduced the issue
// reliably with 2.
info!("waiting for {} tasks to shutdown", remaining);
}
_ = max_delay.tick() => {
let remaining: usize = self.sender.receiver_count();
error!("shutdown wait completing with {} remaining tasks", remaining);
return;
}
}
}
}
}

impl Clone for Shutdown {
Expand Down
Loading