diff --git a/Cargo.lock b/Cargo.lock index 4e1a3a8a..88b12d5c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2793,6 +2793,16 @@ dependencies = [ "thiserror", ] +[[package]] +name = "range-set" +version = "0.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "714bc4849c399f77ab82177e9b4f012e02f3a7d6191023e016334edde5b21050" +dependencies = [ + "num-traits 0.2.17", + "smallvec", +] + [[package]] name = "raw-cpuid" version = "10.7.0" @@ -2895,9 +2905,9 @@ checksum = "e898588f33fdd5b9420719948f9f2a32c922a246964576f71ba7f24f80610fbc" [[package]] name = "reqwest" -version = "0.11.23" +version = "0.11.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37b1ae8d9ac08420c66222fb9096fc5de435c3c48542bc5336c51892cffafb41" +checksum = "c6920094eb85afde5e4a138be3f2de8bbdf28000f0029e72c45025a56b042251" dependencies = [ "base64 0.21.7", "bytes", @@ -2917,9 +2927,11 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", + "rustls-pemfile 1.0.4", "serde", "serde_json", "serde_urlencoded", + "sync_wrapper", "system-configuration", "tokio", "tokio-native-tls", @@ -3087,14 +3099,16 @@ dependencies = [ [[package]] name = "rustls" -version = "0.21.10" +version = "0.22.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba" +checksum = "e87c9956bd9807afa1f77e0f7594af32566e830e088a5576d27c5b6f30f49d41" dependencies = [ "log", "ring 0.17.7", + "rustls-pki-types", "rustls-webpki", - "sct", + "subtle", + "zeroize", ] [[package]] @@ -3106,13 +3120,30 @@ dependencies = [ "base64 0.21.7", ] +[[package]] +name = "rustls-pemfile" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35e4980fa29e4c4b212ffb3db068a564cbf560e51d3944b7c88bd8bf5bec64f4" +dependencies = [ + "base64 0.21.7", + "rustls-pki-types", +] + +[[package]] +name = "rustls-pki-types" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a716eb65e3158e90e17cd93d855216e27bde02745ab842f2cab4a39dba1bacf" + [[package]] name = "rustls-webpki" -version = "0.101.7" +version = "0.102.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" +checksum = "faaa0a62740bedb9b2ef5afa303da42764c012f743917351dc9a237ea1663610" dependencies = [ "ring 0.17.7", + "rustls-pki-types", "untrusted 0.9.0", ] @@ -3194,16 +3225,6 @@ dependencies = [ "thiserror", ] -[[package]] -name = "sct" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" -dependencies = [ - "ring 0.17.7", - "untrusted 0.9.0", -] - [[package]] name = "secrecy" version = "0.8.0" @@ -3330,6 +3351,7 @@ name = "service" version = "0.7.0" dependencies = [ "async-stream", + "axum 0.7.4", "clap 4.4.18", "env_logger", "eyre", @@ -3338,8 +3360,11 @@ dependencies = [ "mockall", "post-rs", "prost", + "range-set", "rcgen", + "reqwest", "rstest 0.18.2", + "serde", "sysinfo", "tempfile", "tokio", @@ -3727,11 +3752,12 @@ dependencies = [ [[package]] name = "tokio-rustls" -version = "0.24.1" +version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" +checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" dependencies = [ "rustls", + "rustls-pki-types", "tokio", ] @@ -3788,9 +3814,9 @@ dependencies = [ [[package]] name = "tonic" -version = "0.10.2" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d560933a0de61cf715926b9cac824d4c883c2c43142f787595e48280c40a1d0e" +checksum = "76c4eb7a4e9ef9d4763600161f12f5070b92a578e1b634db88a6887844c91a13" dependencies = [ "async-stream", "async-trait", @@ -3805,8 +3831,8 @@ dependencies = [ "percent-encoding", "pin-project", "prost", - "rustls", - "rustls-pemfile", + "rustls-pemfile 2.0.0", + "rustls-pki-types", "tokio", "tokio-rustls", "tokio-stream", @@ -3818,9 +3844,9 @@ dependencies = [ [[package]] name = "tonic-build" -version = "0.10.2" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d021fc044c18582b9a2408cd0dd05b1596e3ecdb5c4df822bb0183545683889" +checksum = "be4ef6dd70a610078cb4e338a0f79d06bc759ff1b22d2120c2ff02ae264ba9c2" dependencies = [ "prettyplease", "proc-macro2", diff --git a/README.md b/README.md index ff689c43..73cd72cc 100644 --- a/README.md +++ b/README.md @@ -18,3 +18,5 @@ Includes: ### Randomx-rs [RandomX](https://github.com/tevador/randomx), that [randomx-rs](https://github.com/spacemeshos/randomx-rs) depends on, requires **cmake**. Follow [these instructions](https://github.com/spacemeshos/randomx-rs#build-dependencies) to install it. +## Post Service +Please refer to [service README](service/README.md) for instructions. diff --git a/benches/verifying.rs b/benches/verifying.rs index dc2ee48d..d68def8b 100644 --- a/benches/verifying.rs +++ b/benches/verifying.rs @@ -6,7 +6,7 @@ use post::{ initialize::{CpuInitializer, Initialize}, metadata::ProofMetadata, pow::randomx::{PoW, RandomXFlag}, - prove::generate_proof, + prove::{generate_proof, NoopProgressReporter}, verification::{Mode, Verifier}, }; #[cfg(not(windows))] @@ -44,7 +44,17 @@ fn verifying(c: &mut Criterion) { let pow_flags = RandomXFlag::get_recommended_flags(); // Generate a proof let stop = AtomicBool::new(false); - let proof = generate_proof(datadir.path(), challenge, cfg, 32, 1, pow_flags, stop).unwrap(); + let proof = generate_proof( + datadir.path(), + challenge, + cfg, + 32, + 1, + pow_flags, + stop, + NoopProgressReporter {}, + ) + .unwrap(); let metadata = ProofMetadata::new(metadata, *challenge); // Bench verifying the proof diff --git a/certifier/tests/test_certify.rs b/certifier/tests/test_certify.rs index 19732d02..67a0acb0 100644 --- a/certifier/tests/test_certify.rs +++ b/certifier/tests/test_certify.rs @@ -7,7 +7,7 @@ use post::{ initialize::{CpuInitializer, Initialize}, metadata::ProofMetadata, pow::randomx::RandomXFlag, - prove::generate_proof, + prove::{self, generate_proof}, }; use reqwest::StatusCode; use tokio::net::TcpListener; @@ -45,7 +45,17 @@ async fn test_certificate_post_proof() { // Generate a proof let pow_flags = RandomXFlag::get_recommended_flags(); let stop = AtomicBool::new(false); - let proof = generate_proof(datadir.path(), challenge, cfg, 32, 1, pow_flags, stop).unwrap(); + let proof = generate_proof( + datadir.path(), + challenge, + cfg, + 32, + 1, + pow_flags, + stop, + prove::NoopProgressReporter {}, + ) + .unwrap(); let metadata = ProofMetadata::new(metadata, *challenge); // Spawn the certifier service diff --git a/ffi/src/post_impl.rs b/ffi/src/post_impl.rs index bac634cf..480e8f0d 100644 --- a/ffi/src/post_impl.rs +++ b/ffi/src/post_impl.rs @@ -103,7 +103,16 @@ fn _generate_proof( let challenge = challenge.try_into()?; let stop = AtomicBool::new(false); - let proof = prove::generate_proof(datadir, challenge, cfg, nonces, threads, pow_flags, stop)?; + let proof = prove::generate_proof( + datadir, + challenge, + cfg, + nonces, + threads, + pow_flags, + stop, + prove::NoopProgressReporter {}, + )?; Ok(Box::new(Proof::from(proof))) } diff --git a/service/Cargo.toml b/service/Cargo.toml index 32ab0113..13e4dce6 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -10,7 +10,7 @@ path = "src/lib.rs" [dependencies] post-rs = { path = "../" } prost = "0.12.1" -tonic = { version = "0.10.0", features = ["tls"] } +tonic = { version = "0.11.0", features = ["tls"] } tokio = { version = "1.0", features = [ "rt-multi-thread", "macros", @@ -26,11 +26,15 @@ clap = { version = "4.4.4", features = ["derive"] } hex = "0.4.3" mockall = "0.11.4" sysinfo = "0.29.10" +axum = "0.7.4" +serde = "1.0.196" +range-set = "0.0.11" [build-dependencies] -tonic-build = "0.10.0" +tonic-build = "0.11.0" [dev-dependencies] rcgen = "0.11.3" +reqwest = { version = "0.11.24", features = ["json"] } rstest = "0.18.2" tempfile = "3.8.0" diff --git a/service/README.md b/service/README.md new file mode 100644 index 00000000..96b90aaf --- /dev/null +++ b/service/README.md @@ -0,0 +1,54 @@ +# Post Service +Post service allows to separate expensive PoST proving from a node by allowing to generate a proof on a different machine. It connects to the node via GRPC (on an address pointed by `--address`) and awaits commands from the node. + +## How to run +First of all, the service currently doesn't support initializing PoST data. The data must be initialized separately (presumably using [postcli](https://github.com/spacemeshos/post/tree/develop/cmd/postcli) and placed in a directory pointed to by `--dir`). + +#### Example running on an un-encrypted channel, with the default configuration of _threads_ and _nonces_ +```sh +service --address=http://my-node-address.org --dir=./post-data +``` + +#### Example running on an encrypted (mTLS) channel, with the custom _threads_ and _nonces_ +For mTLS, you need to pass the certificate and private key of the post-service with `--cert` and `-key`, as well as the CA of the server with `--ca-cert`: +```sh +service --address=https://my-node-address.org --cert=client.pem --key=client-key.pem --ca-cert=server-rootCA.pem --dir=./post-data --threads=8 --nonces=288 +``` + +A full usage/help can be viewed with +```sh +service --help +``` + +## Operator API +The operator API is a set of HTTP endpoints allowing control of the post service. + +It is enabled by providing `--operator-address=
`, i.e. `--operator-address=127.0.0.1:50051` CLI argument. + +### Example usage +#### Querying post service status +```sh +# Not doing anything +❯ curl http://localhost:50051/status +"Idle" + +# Proving +❯ curl http://localhost:50051/status +{"Proving":{"nonces":{"start":0,"end":128},"position":0}} + +# Proving, read some data already +❯ curl http://localhost:50051/status +{"Proving":{"nonces":{"start":0,"end":128},"position":10000}} + +# Started second pass +❯ curl http://localhost:50051/status +{"Proving":{"nonces":{"start":128,"end":256},"position":10000}} + +# Finished proving, but the node has not fetched the proof yet +❯ curl http://localhost:50051/status +"DoneProving" + +# Finished proving and the node has fetched the proof +❯ curl http://localhost:50051/status +"Idle" +``` diff --git a/service/api b/service/api index cf5fa12b..10baa944 160000 --- a/service/api +++ b/service/api @@ -1 +1 @@ -Subproject commit cf5fa12bf3dcae33f4472b120a7391e42dd68ad0 +Subproject commit 10baa9442c3416da98e9da57be96c759d725c916 diff --git a/service/src/lib.rs b/service/src/lib.rs index 431311a8..382b7173 100644 --- a/service/src/lib.rs +++ b/service/src/lib.rs @@ -1,2 +1,3 @@ pub mod client; +pub mod operator; pub mod service; diff --git a/service/src/main.rs b/service/src/main.rs index 643a1d48..bdb32b18 100644 --- a/service/src/main.rs +++ b/service/src/main.rs @@ -1,13 +1,14 @@ -use std::{fs::read_to_string, path::PathBuf, time::Duration}; +use std::{fs::read_to_string, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration}; use clap::{Args, Parser, ValueEnum}; use eyre::Context; use sysinfo::{Pid, ProcessExt, ProcessStatus, System, SystemExt}; +use tokio::net::TcpListener; use tokio::sync::oneshot::{self, error::TryRecvError, Receiver}; use tonic::transport::{Certificate, Identity}; use post::pow::randomx::RandomXFlag; -use post_service::client; +use post_service::{client, operator}; /// Post Service #[derive(Parser, Debug)] @@ -39,6 +40,11 @@ struct Cli { /// watch PID and exit if it dies #[arg(long)] watch_pid: Option, + + /// address to listen on for operator service + /// the operator service is disabled if not specified + #[arg(long)] + operator_address: Option, } #[derive(Args, Debug)] @@ -220,6 +226,13 @@ async fn main() -> eyre::Result<()> { None }; + let service = Arc::new(service); + + if let Some(address) = args.operator_address { + let listener = TcpListener::bind(address).await?; + tokio::spawn(operator::run(listener, service.clone())); + } + let client = client::ServiceClient::new(args.address, tls, service)?; let client_handle = tokio::spawn(client.run(args.max_retries, args.reconnect_interval_s)); diff --git a/service/src/operator.rs b/service/src/operator.rs new file mode 100644 index 00000000..800ca5c1 --- /dev/null +++ b/service/src/operator.rs @@ -0,0 +1,89 @@ +//! Operator service for controlling the post service. +//! +//! It exposes an HTTP API. +//! Allows to query the status of the post service. + +use std::{ops::Range, sync::Arc}; + +use axum::{extract::State, routing::get, Json, Router}; +use serde::{Deserialize, Serialize}; +use tokio::net::TcpListener; + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +/// The Post-service state +pub enum ServiceState { + /// The service is idle. + Idle, + /// The service is currently proving. + Proving { + /// The range of nonces being proven in the current data pass. + nonces: Range, + /// The position (in bytes) in the POST data that is already checked. + position: u64, + }, + /// Finished proving, but the proof has not been fetched yet. + DoneProving, +} + +#[mockall::automock] +/// The Service trait provides funcionality required by the OperatorService. +pub trait Service { + /// Returns the current state of the service. + fn status(&self) -> ServiceState; +} + +pub async fn run(listener: TcpListener, service: Arc) -> eyre::Result<()> +where + S: Service + Sync + Send + 'static, +{ + log::info!("running operator service on {}", listener.local_addr()?); + + let app = Router::new() + .route("/status", get(status)) + .with_state(service); + + axum::serve(listener, app) + .await + .map_err(|e| eyre::eyre!("failed to serve: {e}")) +} + +async fn status(State(service): State>) -> Json +where + S: Service + Sync + Send + 'static, +{ + Json(service.status()) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use tokio::net::TcpListener; + + #[tokio::test] + async fn test_status() { + let mut svc = super::MockService::new(); + svc.expect_status() + .once() + .returning(|| super::ServiceState::Idle); + let proving_status = super::ServiceState::Proving { + nonces: 0..64, + position: 1000, + }; + svc.expect_status() + .once() + .return_const(proving_status.clone()); + + let listener = TcpListener::bind("localhost:0").await.unwrap(); + let addr: std::net::SocketAddr = listener.local_addr().unwrap(); + let url = format!("http://{addr}/status"); + + tokio::spawn(super::run(listener, Arc::new(svc))); + + let resp = reqwest::get(&url).await.unwrap(); + assert_eq!(super::ServiceState::Idle, resp.json().await.unwrap()); + + let resp = reqwest::get(&url).await.unwrap(); + assert_eq!(proving_status, resp.json().await.unwrap()); + } +} diff --git a/service/src/service.rs b/service/src/service.rs index 0ecc7d15..b6b64157 100644 --- a/service/src/service.rs +++ b/service/src/service.rs @@ -1,6 +1,7 @@ //! Post Service use std::{ + ops::{Range, RangeInclusive}, path::PathBuf, sync::{atomic::AtomicBool, Arc, Mutex}, }; @@ -9,10 +10,12 @@ use eyre::Context; use post::{ metadata::{PostMetadata, ProofMetadata}, pow::randomx::{PoW, RandomXFlag}, - prove::Proof, + prove::{self, Proof}, verification::{Mode, Verifier}, }; +use crate::operator::ServiceState; + #[derive(Debug)] pub enum ProofGenState { InProgress, @@ -20,9 +23,82 @@ pub enum ProofGenState { } #[derive(Debug)] -struct ProofGenProcess { - handle: std::thread::JoinHandle>>, - challenge: Vec, +enum ProofGenProcess { + Idle, + Running { + handle: Option>>>, + challenge: Vec, + progress: ProvingProgress, + }, + Done { + proof: eyre::Result>, + }, +} + +impl ProofGenProcess { + fn check_finished(&mut self) { + if let ProofGenProcess::Running { handle, .. } = self { + if handle.as_ref().unwrap().is_finished() { + let proof = match handle.take().unwrap().join() { + Ok(result) => result, + Err(err) => { + std::panic::resume_unwind(err); + } + }; + *self = ProofGenProcess::Done { proof }; + } + } + } +} + +#[derive(Clone, Debug, Default)] +struct ProvingProgress { + inner: Arc>, +} + +#[derive(Clone, Debug)] +struct ProvingProgressInner { + // currently processed nonces + nonces: std::ops::Range, + // already finished chunks of data + // the chunks are automatically merged when possible + chunks: range_set::RangeSet<[RangeInclusive; 20]>, +} + +impl Default for ProvingProgressInner { + fn default() -> Self { + Self { + nonces: 0..0, + chunks: range_set::RangeSet::new(), + } + } +} + +impl prove::ProgressReporter for ProvingProgress { + fn finished_chunk(&self, pos: u64, len: usize) { + if len == 0 { + return; + } + + let range = pos..=(pos + len as u64 - 1); + self.inner.lock().unwrap().chunks.insert_range(range); + } + + fn new_nonce_group(&self, nonces: std::ops::Range) { + let mut progress = self.inner.lock().unwrap(); + progress.nonces = nonces; + progress.chunks.clear(); + } +} + +impl ProvingProgress { + fn get(&self) -> (Range, u64) { + let progress = self.inner.lock().unwrap(); + ( + progress.nonces.clone(), + progress.chunks.as_ref().first().map_or(0, |r| *r.end() + 1), + ) + } } pub struct PostService { @@ -32,7 +108,7 @@ pub struct PostService { nonces: usize, threads: usize, pow_flags: RandomXFlag, - proof_generation: Mutex>, + proof_generation: Mutex, stop: Arc, } @@ -47,7 +123,7 @@ impl PostService { pow_flags: RandomXFlag, ) -> eyre::Result { Ok(Self { - proof_generation: Mutex::new(None), + proof_generation: Mutex::new(ProofGenProcess::Idle), datadir, cfg, init_cfg, @@ -60,55 +136,54 @@ impl PostService { } impl crate::client::PostService for PostService { - fn gen_proof(&self, challenge: Vec) -> eyre::Result { + fn gen_proof(&self, ch: Vec) -> eyre::Result { let mut proof_gen = self.proof_generation.lock().unwrap(); - if let Some(process) = proof_gen.as_mut() { - eyre::ensure!( - process.challenge == challenge, - "proof generation is in progress for a different challenge (current: {:X?}, requested: {:X?})", process.challenge, challenge, + proof_gen.check_finished(); + match &*proof_gen { + ProofGenProcess::Running { challenge, .. } => { + eyre::ensure!( + challenge == &ch, + "proof generation is in progress for a different challenge (current: {:X?}, requested: {:X?})", challenge, ch, ); - - if process.handle.is_finished() { + return Ok(ProofGenState::InProgress); + } + ProofGenProcess::Idle => { + let challenge: [u8; 32] = ch + .as_slice() + .try_into() + .map_err(|_| eyre::eyre!("invalid challenge format"))?; + log::info!("starting proof generation for challenge {challenge:X?}"); + let pow_flags = self.pow_flags; + let cfg = self.cfg; + let datadir = self.datadir.clone(); + let nonces = self.nonces; + let threads = self.threads; + let stop = self.stop.clone(); + let progress = ProvingProgress::default(); + let reporter = progress.clone(); + *proof_gen = ProofGenProcess::Running { + challenge: ch, + handle: Some(std::thread::spawn(move || { + post::prove::generate_proof( + &datadir, &challenge, cfg, nonces, threads, pow_flags, stop, reporter, + ) + })), + progress, + }; + } + ProofGenProcess::Done { proof } => { log::info!("proof generation is finished"); - let result = match proof_gen.take().unwrap().handle.join() { - Ok(result) => result, - Err(err) => { - std::panic::resume_unwind(err); - } + let result = match proof { + Ok(proof) => Ok(ProofGenState::Finished { + proof: proof.clone(), + }), + Err(e) => Err(eyre::eyre!("proof generation failed: {}", e)), }; - - match result { - Ok(proof) => { - return Ok(ProofGenState::Finished { proof }); - } - Err(e) => { - return Err(e); - } - } - } else { - log::info!("proof generation in progress"); - return Ok(ProofGenState::InProgress); + *proof_gen = ProofGenProcess::Idle; + return result; } } - let ch: [u8; 32] = challenge - .as_slice() - .try_into() - .map_err(|_| eyre::eyre!("invalid challenge format"))?; - log::info!("starting proof generation for challenge {ch:X?}"); - let pow_flags = self.pow_flags; - let cfg = self.cfg; - let datadir = self.datadir.clone(); - let nonces = self.nonces; - let threads = self.threads; - let stop = self.stop.clone(); - *proof_gen = Some(ProofGenProcess { - challenge, - handle: std::thread::spawn(move || { - post::prove::generate_proof(&datadir, &ch, cfg, nonces, threads, pow_flags, stop) - }), - }); - Ok(ProofGenState::InProgress) } @@ -126,13 +201,32 @@ impl crate::client::PostService for PostService { } } +impl crate::operator::Service for PostService { + fn status(&self) -> ServiceState { + let mut proof_gen = self.proof_generation.lock().unwrap(); + proof_gen.check_finished(); + match &*proof_gen { + ProofGenProcess::Running { progress, .. } => { + let (nonces, offset) = progress.get(); + ServiceState::Proving { + nonces, + position: offset, + } + } + ProofGenProcess::Idle => ServiceState::Idle, + ProofGenProcess::Done { .. } => ServiceState::DoneProving, + } + } +} + impl Drop for PostService { fn drop(&mut self) { log::info!("shutting down post service"); - if let Some(process) = self.proof_generation.lock().unwrap().take() { - log::debug!("killing proof generation process"); + if let ProofGenProcess::Running { handle, .. } = &mut *self.proof_generation.lock().unwrap() + { + log::debug!("stopping proof generation process"); self.stop.store(true, std::sync::atomic::Ordering::Relaxed); - let _ = process.handle.join().unwrap(); + let _ = handle.take().unwrap().join().unwrap(); log::debug!("proof generation process exited"); } } diff --git a/service/tests/test_operator.rs b/service/tests/test_operator.rs new file mode 100644 index 00000000..ee922c36 --- /dev/null +++ b/service/tests/test_operator.rs @@ -0,0 +1,104 @@ +use core::{panic, time}; +use std::{sync::Arc, time::Duration}; + +use post_service::operator::{self, ServiceState}; +use tokio::{net::TcpListener, time::sleep}; + +use post::{ + initialize::{CpuInitializer, Initialize}, + pow::randomx::RandomXFlag, +}; +use post_service::client::spacemesh_v1::{service_response, GenProofStatus}; + +#[allow(dead_code)] +mod server; +use server::TestServer; + +#[tokio::test] +async fn test_gen_proof_in_progress() { + // Initialize some data + let datadir = tempfile::tempdir().unwrap(); + + let cfg = post::config::ProofConfig { + k1: 8, + k2: 12, + pow_difficulty: [0xFF; 32], + }; + let init_cfg = post::config::InitConfig { + min_num_units: 1, + max_num_units: 100, + labels_per_unit: 2560, + scrypt: post::config::ScryptParams::new(2, 1, 1), + }; + + CpuInitializer::new(init_cfg.scrypt) + .initialize( + datadir.path(), + &[0xBE; 32], + &[0xCE; 32], + init_cfg.labels_per_unit, + 4, + 256, + None, + ) + .unwrap(); + + let pow_flags = RandomXFlag::get_recommended_flags(); + + let service = Arc::new( + post_service::service::PostService::new( + datadir.into_path(), + cfg, + init_cfg, + 16, + 1, + pow_flags, + ) + .unwrap(), + ); + + let mut test_server = TestServer::new().await; + let client = test_server.create_client(service.clone()); + tokio::spawn(client.run(None, time::Duration::from_secs(1))); + + // Create operator server and client + let listener = TcpListener::bind("localhost:0").await.unwrap(); + let operator_addr = format!("http://{}", listener.local_addr().unwrap()); + tokio::spawn(operator::run(listener, service)); + + let status_url = format!("{operator_addr}/status"); + let resp = reqwest::get(&status_url).await.unwrap(); + let status = resp.json().await.unwrap(); + // It starts in idle state + assert!(matches!(status, ServiceState::Idle)); + + // It transforms to Proving when a proof generation starts + let connected = test_server.connected.recv().await.unwrap(); + + loop { + let response = TestServer::generate_proof(&connected, vec![0xCA; 32]).await; + let status_resp = reqwest::get(&status_url).await.unwrap(); + let status = status_resp.json().await.unwrap(); + + if let Some(service_response::Kind::GenProof(resp)) = response.kind { + match resp.status() { + GenProofStatus::Ok => { + if resp.proof.is_some() { + assert!(matches!(status, ServiceState::Idle)); + break; + } + assert!(matches!( + status, + ServiceState::Proving { .. } | ServiceState::DoneProving + )); + } + _ => { + panic!("got error response"); + } + } + } else { + panic!("got wrong response kind"); + } + sleep(Duration::from_millis(10)).await; + } +} diff --git a/src/prove.rs b/src/prove.rs index 338e516b..f232da32 100644 --- a/src/prove.rs +++ b/src/prove.rs @@ -9,6 +9,7 @@ //! TODO: explain use std::borrow::{Borrow, Cow}; + use std::sync::{ atomic::{AtomicBool, Ordering}, Mutex, @@ -18,6 +19,7 @@ use std::{collections::HashMap, ops::Range, path::Path, time::Instant}; use aes::cipher::block_padding::NoPadding; use aes::cipher::BlockEncrypt; use eyre::Context; +use mockall::automock; use primitive_types::U256; use randomx_rs::RandomXFlag; use rayon::prelude::{ParallelBridge, ParallelIterator}; @@ -77,6 +79,19 @@ impl ProvingParams { } } +#[automock] +pub trait ProgressReporter { + fn new_nonce_group(&self, nonces: Range); + fn finished_chunk(&self, position: u64, len: usize); +} + +pub struct NoopProgressReporter {} + +impl ProgressReporter for NoopProgressReporter { + fn new_nonce_group(&self, _: Range) {} + fn finished_chunk(&self, _: u64, _: usize) {} +} + pub trait Prover { fn prove(&self, batch: &[u8], index: u64, consume: F) -> Option<(u32, Vec)> where @@ -265,17 +280,19 @@ impl Prover for Prover8_56 { /// Generate a proof that data is still held, given the challenge. #[allow(clippy::too_many_arguments)] -pub fn generate_proof( +pub fn generate_proof( datadir: &Path, challenge: &[u8; 32], cfg: ProofConfig, - nonces: usize, + nonces_size: usize, threads: usize, pow_flags: RandomXFlag, stop: Stopper, + reporter: Reporter, ) -> eyre::Result> where Stopper: Borrow, + Reporter: ProgressReporter + Send + Sync, { let stop = stop.borrow(); let metadata = metadata::load(datadir).wrap_err("loading metadata")?; @@ -283,8 +300,7 @@ where log::info!("generating proof with PoW flags: {pow_flags:?} and params: {params:?}"); let pow_prover = pow::randomx::PoW::new(pow_flags)?; - let mut start_nonce = 0; - let mut end_nonce = start_nonce + nonces as u32; + let mut nonces = 0..nonces_size as u32; let pool = rayon::ThreadPoolBuilder::new() .num_threads(threads) @@ -296,33 +312,30 @@ where if stop.load(Ordering::Relaxed) { eyre::bail!("proof generation was stopped"); } + reporter.new_nonce_group(nonces.clone()); let indexes = Mutex::new(HashMap::>::new()); let pow_time = Instant::now(); let prover = pool.install(|| { - Prover8_56::new( - challenge, - start_nonce..end_nonce, - params, - &pow_prover, - &metadata.node_id, - ) - .wrap_err("creating prover") + let miner_id = &metadata.node_id; + Prover8_56::new(challenge, nonces.clone(), params, &pow_prover, miner_id) + .wrap_err("creating prover") })?; - let pow_mins = pow_time.elapsed().as_secs() / 60; - log::info!("Finished k2pow in {} minutes", pow_mins); + let pow_secs = pow_time.elapsed().as_secs(); + let pow_mins = pow_secs / 60; + log::info!("finished k2pow in {pow_mins}m {}s", pow_secs % 60); let read_time = Instant::now(); let data_reader = read_data(datadir, 1024 * 1024, metadata.max_file_size)?; - log::info!("Started reading POST data"); + log::info!("started reading POST data"); let result = pool.install(|| { data_reader .par_bridge() .take_any_while(|_| !stop.load(Ordering::Relaxed)) .find_map_any(|batch| { - prover.prove( + let res = prover.prove( &batch.data, batch.pos / BLOCK_SIZE as u64, |nonce, index| { @@ -334,24 +347,31 @@ where } None }, - ) + ); + reporter.finished_chunk(batch.pos, batch.data.len()); + + res }) }); - - let read_mins = read_time.elapsed().as_secs() / 60; - log::info!("Finished reading POST data in {} minutes", read_mins); + let read_secs = read_time.elapsed().as_secs(); + let read_mins = read_secs / 60; + log::info!( + "finished reading POST data in {read_mins}m {}s", + read_secs % 60 + ); if let Some((nonce, indices)) = result { let num_labels = metadata.num_units as u64 * metadata.labels_per_unit; let pow = prover.get_pow(nonce).unwrap(); - let total_minutes = total_time.elapsed().as_secs() / 60; + let total_secs = total_time.elapsed().as_secs(); + let total_mins = total_secs / 60; - log::info!("Found proof for nonce: {nonce}, pow: {pow} with {indices:?} indices. Proof took {total_minutes} minutes"); + log::info!("found proof for nonce: {nonce}, pow: {pow} with {indices:?} indices. It took {total_mins}m {}s", total_secs % 60); return Ok(Proof::new(nonce, &indices, num_labels, pow)); } - (start_nonce, end_nonce) = (end_nonce, end_nonce + nonces as u32); + nonces = nonces.end..(nonces.end + nonces_size as u32); } } diff --git a/tests/generate_and_verify.rs b/tests/generate_and_verify.rs index 9c2bfcdd..350466be 100644 --- a/tests/generate_and_verify.rs +++ b/tests/generate_and_verify.rs @@ -6,7 +6,7 @@ use post::{ initialize::{CpuInitializer, Initialize}, metadata::ProofMetadata, pow::randomx::{PoW, RandomXFlag}, - prove::{generate_proof, Proof}, + prove::{self, generate_proof, Proof}, verification::{Error, Mode, Verifier}, }; use tempfile::tempdir; @@ -44,7 +44,21 @@ fn test_generate_and_verify() { let pow_flags = RandomXFlag::get_recommended_flags(); // Generate a proof let stop = AtomicBool::new(false); - let proof = generate_proof(datadir.path(), challenge, cfg, 32, 1, pow_flags, stop).unwrap(); + let mut reporter = prove::MockProgressReporter::new(); + reporter.expect_new_nonce_group().once().return_const(()); + reporter.expect_finished_chunk().times(1..).return_const(()); + + let proof = generate_proof( + datadir.path(), + challenge, + cfg, + 32, + 1, + pow_flags, + stop, + reporter, + ) + .unwrap(); // Verify the proof let metadata = ProofMetadata::new(metadata, *challenge); @@ -132,7 +146,17 @@ fn test_generate_and_verify_difficulty_msb_not_zero() { let pow_flags = RandomXFlag::get_recommended_flags(); // Generate a proof let stop = AtomicBool::new(false); - let proof = generate_proof(datadir.path(), challenge, cfg, 32, 1, pow_flags, stop).unwrap(); + let proof = generate_proof( + datadir.path(), + challenge, + cfg, + 32, + 1, + pow_flags, + stop, + prove::NoopProgressReporter {}, + ) + .unwrap(); // Verify the proof let metadata = ProofMetadata::new(metadata, *challenge);