diff --git a/Cargo.lock b/Cargo.lock index 95c8001a..101b327c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -462,7 +462,7 @@ dependencies = [ [[package]] name = "certifier" -version = "0.6.0" +version = "0.6.1" dependencies = [ "axum", "axum-prometheus", @@ -1547,7 +1547,7 @@ dependencies = [ [[package]] name = "initializer" -version = "0.6.0" +version = "0.6.1" dependencies = [ "base64 0.21.5", "clap 4.4.7", @@ -2317,7 +2317,7 @@ checksum = "3bccab0e7fd7cc19f820a1c8c91720af652d0c88dc9664dd72aef2614f04af3b" [[package]] name = "post-cbindings" -version = "0.6.0" +version = "0.6.1" dependencies = [ "cbindgen", "log", @@ -2328,7 +2328,7 @@ dependencies = [ [[package]] name = "post-rs" -version = "0.6.0" +version = "0.6.1" dependencies = [ "aes", "bitvec", @@ -2457,7 +2457,7 @@ dependencies = [ [[package]] name = "profiler" -version = "0.6.0" +version = "0.6.1" dependencies = [ "clap 4.4.7", "env_logger", @@ -3039,7 +3039,7 @@ dependencies = [ [[package]] name = "scrypt-ocl" -version = "0.6.0" +version = "0.6.1" dependencies = [ "log", "ocl", @@ -3210,7 +3210,7 @@ dependencies = [ [[package]] name = "service" -version = "0.6.0" +version = "0.6.1" dependencies = [ "async-stream", "clap 4.4.7", diff --git a/Cargo.toml b/Cargo.toml index 50663597..8e4d6cb3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ members = [ [package] name = "post-rs" -version = "0.6.0" +version = "0.6.1" edition = "2021" [lib] diff --git a/certifier/Cargo.toml b/certifier/Cargo.toml index 1565963e..c1a20eff 100644 --- a/certifier/Cargo.toml +++ b/certifier/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "certifier" -version = "0.6.0" +version = "0.6.1" edition = "2021" [dependencies] diff --git a/ffi/Cargo.toml b/ffi/Cargo.toml index 44d77838..63d4f5c3 100644 --- a/ffi/Cargo.toml +++ b/ffi/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "post-cbindings" -version = "0.6.0" +version = "0.6.1" edition = "2021" diff --git a/initializer/Cargo.toml b/initializer/Cargo.toml index 1fb6755c..141cc5e5 100644 --- a/initializer/Cargo.toml +++ b/initializer/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "initializer" -version = "0.6.0" +version = "0.6.1" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/profiler/Cargo.toml b/profiler/Cargo.toml index 7e7a3cbe..b170bc0c 100644 --- a/profiler/Cargo.toml +++ b/profiler/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "profiler" -version = "0.6.0" +version = "0.6.1" edition = "2021" [dependencies] diff --git a/scrypt-ocl/Cargo.toml b/scrypt-ocl/Cargo.toml index bf1520bc..b4ae894a 100644 --- a/scrypt-ocl/Cargo.toml +++ b/scrypt-ocl/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "scrypt-ocl" -version = "0.6.0" +version = "0.6.1" edition = "2021" [dependencies] diff --git a/service/Cargo.toml b/service/Cargo.toml index 5aafbf74..be38f393 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "service" -version = "0.6.0" +version = "0.6.1" edition = "2021" [lib] diff --git a/service/src/main.rs b/service/src/main.rs index e0227f0f..03662863 100644 --- a/service/src/main.rs +++ b/service/src/main.rs @@ -3,6 +3,7 @@ use std::{fs::read_to_string, path::PathBuf, time::Duration}; use clap::{Args, Parser, ValueEnum}; use eyre::Context; use sysinfo::{Pid, ProcessExt, ProcessStatus, System, SystemExt}; +use tokio::sync::oneshot::{self, error::TryRecvError, Receiver}; use tonic::transport::{Certificate, Identity}; use post::pow::randomx::RandomXFlag; @@ -226,30 +227,34 @@ async fn main() -> eyre::Result<()> { let client = client::ServiceClient::new(args.address, tls, service)?; let client_handle = tokio::spawn(client.run(args.max_retries, args.reconnect_interval_s)); + // A channel to communicate when the blocking task should quit. + let (term_tx, term_rx) = oneshot::channel(); + tokio::select! { - Some(err) = watch_pid_if_needed(args.watch_pid) => { + Some(err) = watch_pid_if_needed(args.watch_pid.map(|p| (p, term_rx))) => { log::info!("PID watcher exited: {err:?}"); return Ok(()) } err = client_handle => { + drop(term_tx); return err.unwrap(); } } } async fn watch_pid_if_needed( - pid: Option, + watch: Option<(Pid, Receiver<()>)>, ) -> Option> { - match pid { - Some(pid) => { - Some(tokio::task::spawn_blocking(move || watch_pid(pid, Duration::from_secs(1))).await) - } + match watch { + Some((pid, term)) => Some( + tokio::task::spawn_blocking(move || watch_pid(pid, Duration::from_secs(1), term)).await, + ), None => None, } } // watch given PID and return when it dies -fn watch_pid(pid: Pid, interval: Duration) { +fn watch_pid(pid: Pid, interval: Duration, mut term: Receiver<()>) { log::info!("watching PID {pid}"); let mut sys = System::new(); @@ -257,15 +262,20 @@ fn watch_pid(pid: Pid, interval: Duration) { if let Some(p) = sys.process(pid) { match p.status() { ProcessStatus::Zombie | ProcessStatus::Dead => { - break; + log::info!("PID {pid} died"); + return; } _ => {} } } - std::thread::sleep(interval); + match term.try_recv() { + Ok(_) | Err(TryRecvError::Closed) => { + log::debug!("PID watcher received termination signal"); + return; + } + _ => std::thread::sleep(interval), + } } - - log::info!("PID {pid} died"); } #[cfg(test)] @@ -273,13 +283,15 @@ mod tests { use std::process::Command; use sysinfo::{Pid, PidExt}; + use tokio::sync::oneshot; #[tokio::test] async fn watch_pid_if_needed() { // Don't watch assert!(super::watch_pid_if_needed(None).await.is_none()); // Watch - super::watch_pid_if_needed(Some(Pid::from(0))) + let (_term_tx, term_rx) = oneshot::channel(); + super::watch_pid_if_needed(Some((Pid::from(0), term_rx))) .await .expect("should be some") .expect("should be OK"); @@ -289,11 +301,12 @@ mod tests { async fn watching_pid_zombie() { let mut proc = Command::new("sleep").arg("99999").spawn().unwrap(); let pid = proc.id(); - + let (_term_tx, term_rx) = oneshot::channel(); let handle = tokio::task::spawn_blocking(move || { super::watch_pid( sysinfo::Pid::from_u32(pid), std::time::Duration::from_millis(10), + term_rx, ) }); // just kill leaves a zombie process @@ -305,11 +318,13 @@ mod tests { async fn watching_pid_reaped() { let mut proc = Command::new("sleep").arg("99999").spawn().unwrap(); let pid = proc.id(); + let (_term_tx, term_rx) = oneshot::channel(); let handle = tokio::task::spawn_blocking(move || { super::watch_pid( sysinfo::Pid::from_u32(pid), std::time::Duration::from_millis(10), + term_rx, ) }); @@ -318,4 +333,36 @@ mod tests { proc.wait().unwrap(); handle.await.unwrap(); } + + #[tokio::test] + async fn terminate_watching_pid() { + let mut proc = Command::new("sleep").arg("99999").spawn().unwrap(); + let pid = proc.id(); + let (term_tx, term_rx) = oneshot::channel(); + let handle = tokio::task::spawn_blocking(move || { + super::watch_pid( + sysinfo::Pid::from_u32(pid), + std::time::Duration::from_millis(10), + term_rx, + ) + }); + // Terminate by closing the channel + drop(term_tx); + handle.await.unwrap(); + + let (term_tx, term_rx) = oneshot::channel(); + let handle = tokio::task::spawn_blocking(move || { + super::watch_pid( + sysinfo::Pid::from_u32(pid), + std::time::Duration::from_millis(10), + term_rx, + ) + }); + // Terminate by sending a signal + term_tx.send(()).unwrap(); + handle.await.unwrap(); + + proc.kill().unwrap(); + proc.wait().unwrap(); + } }