From ed689df04f9af5604230db7580f804a108d1aafe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20R=C3=B3=C5=BCa=C5=84ski?= Date: Wed, 25 Oct 2023 15:37:14 +0200 Subject: [PATCH 1/7] Basic operator API --- Cargo.lock | 18 ++++- README.md | 3 + service/Cargo.toml | 1 + service/README.md | 37 ++++++++++ service/api | 2 +- service/build.rs | 8 ++- service/src/lib.rs | 1 + service/src/main.rs | 46 +++++++++---- service/src/operator.rs | 119 +++++++++++++++++++++++++++++++++ service/src/service.rs | 11 +++ service/tests/test_operator.rs | 94 ++++++++++++++++++++++++++ 11 files changed, 322 insertions(+), 18 deletions(-) create mode 100644 service/README.md create mode 100644 service/src/operator.rs create mode 100644 service/tests/test_operator.rs diff --git a/Cargo.lock b/Cargo.lock index 49dfcc3d..88ecc3d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -160,9 +160,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.73" +version = "0.1.74" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" +checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9" dependencies = [ "proc-macro2", "quote", @@ -2508,6 +2508,7 @@ dependencies = [ "tokio-stream", "tonic", "tonic-build", + "tonic-reflection", ] [[package]] @@ -2887,6 +2888,19 @@ dependencies = [ "syn 2.0.29", ] +[[package]] +name = "tonic-reflection" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fa37c513df1339d197f4ba21d28c918b9ef1ac1768265f11ecb6b7f1cba1b76" +dependencies = [ + "prost", + "prost-types", + "tokio", + "tokio-stream", + "tonic", +] + [[package]] name = "tower" version = "0.4.13" diff --git a/README.md b/README.md index be17be2b..eea299b6 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,9 @@ Includes: ### Randomx-rs [RandomX](https://github.com/tevador/randomx), that [randomx-rs](https://github.com/spacemeshos/randomx-rs) depends on, requires **cmake**. Follow [these instructions](https://github.com/spacemeshos/randomx-rs#build-dependencies) to install it. +## Post Service +Please refer to [service README](service/README.md) for instructions. + ## Troubleshooting ### Crash on Mac arm64 RandomX is known to misbehave, or even crash on arm64 Macs when using JIT. See this issue for details: https://github.com/tevador/RandomX/issues/262. diff --git a/service/Cargo.toml b/service/Cargo.toml index 6bbbd05d..bc144590 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -11,6 +11,7 @@ path = "src/lib.rs" post-rs = { path = "../" } prost = "0.12.1" tonic = { version = "0.10.0", features = ["tls"] } +tonic-reflection = "0.10.2" tokio = { version = "1.0", features = [ "rt-multi-thread", "macros", diff --git a/service/README.md b/service/README.md new file mode 100644 index 00000000..707e6203 --- /dev/null +++ b/service/README.md @@ -0,0 +1,37 @@ +# Post Service +Post service allows to separate expensive PoST proving from a node by allowing to generate a proof on a different machine. It connects to the node via GRPC (on an address pointed by `--address`) and awaits commands from the node. + +## How to run +First of all, the service currently doesn't support initializing PoST data. The data must be initialized separately (presumably using [postcli](https://github.com/spacemeshos/post/tree/develop/cmd/postcli) and placed in a directory pointed to by `--dir`). + +#### Example running on an un-encrypted channel, with the default configuration of _threads_ and _nonces_ +```sh +service --address=http://my-node-address.org --dir=./post-data +``` + +#### Example running on an encrypted (mTLS) channel, with the custom _threads_ and _nonces_ +For mTLS, you need to pass the certificate and private key of the post-service with `--cert` and `-key`, as well as the CA of the server with `--ca-cert`: +```sh +service --address=https://my-node-address.org --cert=client.pem --key=client-key.pem --ca-cert=server-rootCA.pem --dir=./post-data --threads=8 --nonces=288 +``` + +A full usage/help can be viewed with +```sh +service --help +``` + +## Operator API +The operator API is a set of GRPC endpoints allowing control of the post service. + +The GRPC API supports reflection for easy use with tools like [grpcurl](https://github.com/fullstorydev/grpcurl). + +It is enabled by providing `--operator-address=
`, i.e. `--operator-address=127.0.0.1:50051` CLI argument. + +### Example usage +#### Querying post service status +```sh +❯ grpcurl -plaintext localhost:50051 spacemesh.v1.PostServiceOperator/Status +{ + "status": "IDLE" +} +``` \ No newline at end of file diff --git a/service/api b/service/api index cf5fa12b..f2516cf2 160000 --- a/service/api +++ b/service/api @@ -1 +1 @@ -Subproject commit cf5fa12bf3dcae33f4472b120a7391e42dd68ad0 +Subproject commit f2516cf212831b08fc1e9b84df6679d0eccf5b05 diff --git a/service/build.rs b/service/build.rs index b4eb8a0a..e66f81ca 100644 --- a/service/build.rs +++ b/service/build.rs @@ -1,4 +1,10 @@ +use std::{env, path::PathBuf}; + fn main() -> Result<(), Box> { - tonic_build::configure().compile(&["api/spacemesh/v1/post.proto"], &["api"])?; + let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap()); + tonic_build::configure() + .file_descriptor_set_path(out_dir.join("post_descriptor.bin")) + .compile(&["api/spacemesh/v1/post.proto"], &["api"])?; + Ok(()) } diff --git a/service/src/lib.rs b/service/src/lib.rs index 431311a8..382b7173 100644 --- a/service/src/lib.rs +++ b/service/src/lib.rs @@ -1,2 +1,3 @@ pub mod client; +pub mod operator; pub mod service; diff --git a/service/src/main.rs b/service/src/main.rs index 75cf63ac..42607995 100644 --- a/service/src/main.rs +++ b/service/src/main.rs @@ -1,11 +1,12 @@ -use std::{fs::read_to_string, path::PathBuf, time::Duration}; +use std::{fs::read_to_string, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration}; use clap::{Args, Parser, ValueEnum}; use eyre::Context; +use tokio::net::TcpListener; use tonic::transport::{Certificate, Identity}; use post::pow::randomx::RandomXFlag; -use post_service::client; +use post_service::{client, operator}; /// Post Service #[derive(Parser, Debug)] @@ -29,6 +30,11 @@ struct Cli { #[command(flatten, next_help_heading = "TLS configuration")] tls: Option, + + /// address to listen on for operator service + /// the operator service is disabled if not specified + #[arg(long)] + operator_address: Option, } #[derive(Args, Debug)] @@ -55,6 +61,22 @@ struct PostConfig { scrypt: ScryptParams, } +impl From for post::config::Config { + fn from(val: PostConfig) -> Self { + post::config::Config { + k1: val.k1, + k2: val.k2, + k3: val.k3, + pow_difficulty: val.pow_difficulty, + scrypt: post::ScryptParams::new( + val.scrypt.n.ilog2() as u8 - 1, + val.scrypt.r.ilog2() as u8, + val.scrypt.p.ilog2() as u8, + ), + } + } +} + /// Scrypt parameters for initialization #[derive(Args, Debug)] struct ScryptParams { @@ -161,17 +183,7 @@ async fn main() -> eyre::Result<()> { let service = post_service::service::PostService::new( args.dir, - post::config::Config { - k1: args.post_config.k1, - k2: args.post_config.k2, - k3: args.post_config.k3, - pow_difficulty: args.post_config.pow_difficulty, - scrypt: post::ScryptParams::new( - args.post_config.scrypt.n.ilog2() as u8 - 1, - args.post_config.scrypt.r.ilog2() as u8, - args.post_config.scrypt.p.ilog2() as u8, - ), - }, + args.post_config.into(), args.post_settings.nonces, args.post_settings.threads, args.post_settings.randomx_mode.into(), @@ -199,7 +211,13 @@ async fn main() -> eyre::Result<()> { None }; - let client = client::ServiceClient::new(args.address, args.reconnect_interval_s, tls, service)?; + let service = Arc::new(service); + + if let Some(address) = args.operator_address { + let listener = TcpListener::bind(address).await?; + tokio::spawn(operator::OperatorServer::run(listener, service.clone())); + } + let client = client::ServiceClient::new(args.address, args.reconnect_interval_s, tls, service)?; client.run().await } diff --git a/service/src/operator.rs b/service/src/operator.rs new file mode 100644 index 00000000..e27a8226 --- /dev/null +++ b/service/src/operator.rs @@ -0,0 +1,119 @@ +//! Operator service for controlling the post service. +//! +//! It exposes a GRPC API defined in `spacemesh.v1.post.proto`. +//! Allows to query the status of the post service. + +use std::sync::Arc; + +use tokio::net::TcpListener; +use tokio_stream::wrappers::TcpListenerStream; +use tonic::{transport::Server, Request, Response, Status}; + +use spacemesh_v1::post_service_operator_server::{PostServiceOperator, PostServiceOperatorServer}; +use spacemesh_v1::{PostServiceStatusRequest, PostServiceStatusResponse}; + +pub mod spacemesh_v1 { + tonic::include_proto!("spacemesh.v1"); + pub(crate) const FILE_DESCRIPTOR_SET: &[u8] = + tonic::include_file_descriptor_set!("post_descriptor"); +} + +pub enum ServiceState { + Idle, + Proving, +} + +#[mockall::automock] +/// The Service trait provides funcionality required by the OperatorService. +pub trait Service { + /// Returns the current state of the service. + fn status(&self) -> ServiceState; +} + +#[derive(Debug, Default)] +pub struct OperatorService { + operator: Arc, +} + +#[tonic::async_trait] +impl PostServiceOperator for OperatorService { + async fn status( + &self, + request: Request, + ) -> Result, Status> { + log::debug!("got a request from {:?}", request.remote_addr()); + + let status = match self.operator.status() { + ServiceState::Idle => spacemesh_v1::post_service_status_response::Status::Idle, + ServiceState::Proving => spacemesh_v1::post_service_status_response::Status::Proving, + }; + + Ok(Response::new(PostServiceStatusResponse { + status: status as _, + })) + } +} + +#[derive(Debug, Default)] +pub struct OperatorServer {} + +impl OperatorServer { + pub async fn run(listener: TcpListener, operator: Arc) -> eyre::Result<()> + where + O: Service + Sync + Send + 'static, + { + log::info!("running operator service on {}", listener.local_addr()?); + + let reflection_service = tonic_reflection::server::Builder::configure() + .register_encoded_file_descriptor_set(spacemesh_v1::FILE_DESCRIPTOR_SET) + .build()?; + + let operator_service = PostServiceOperatorServer::new(OperatorService { operator }); + + Server::builder() + .add_service(reflection_service) + .add_service(operator_service) + .serve_with_incoming(TcpListenerStream::new(listener)) + .await + .map_err(|e| eyre::eyre!("failed to serve: {e}")) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use tokio::net::TcpListener; + + use super::spacemesh_v1::post_service_operator_client::PostServiceOperatorClient; + use super::spacemesh_v1::post_service_status_response::Status; + use super::spacemesh_v1::PostServiceStatusRequest; + + #[tokio::test] + async fn test_status() { + let mut svc_operator = super::MockService::new(); + svc_operator + .expect_status() + .once() + .returning(|| super::ServiceState::Idle); + svc_operator + .expect_status() + .once() + .returning(|| super::ServiceState::Proving); + + let listener = TcpListener::bind("localhost:0").await.unwrap(); + let addr: std::net::SocketAddr = listener.local_addr().unwrap(); + + tokio::spawn(super::OperatorServer::run(listener, Arc::new(svc_operator))); + + let mut client = PostServiceOperatorClient::connect(format!("http://{addr}")) + .await + .unwrap(); + + let response = client.status(PostServiceStatusRequest {}).await.unwrap(); + assert_eq!(response.into_inner().status(), Status::Idle); + + let response = client.status(PostServiceStatusRequest {}).await.unwrap(); + assert_eq!(response.into_inner().status(), Status::Proving); + } +} diff --git a/service/src/service.rs b/service/src/service.rs index f8062795..8aa6933d 100644 --- a/service/src/service.rs +++ b/service/src/service.rs @@ -122,6 +122,17 @@ impl crate::client::PostService for PostService { } } +impl crate::operator::Service for PostService { + fn status(&self) -> crate::operator::ServiceState { + let proof_gen = self.proof_generation.lock().unwrap(); + if proof_gen.as_ref().is_some() { + crate::operator::ServiceState::Proving + } else { + crate::operator::ServiceState::Idle + } + } +} + impl Drop for PostService { fn drop(&mut self) { log::info!("shutting down post service"); diff --git a/service/tests/test_operator.rs b/service/tests/test_operator.rs new file mode 100644 index 00000000..e957f300 --- /dev/null +++ b/service/tests/test_operator.rs @@ -0,0 +1,94 @@ +use core::panic; +use std::{sync::Arc, time::Duration}; + +use tokio::{net::TcpListener, time::sleep}; + +use post::{ + initialize::{CpuInitializer, Initialize}, + pow::randomx::RandomXFlag, + ScryptParams, +}; +use post_service::{ + client::spacemesh_v1::{service_response, GenProofStatus}, + operator::{ + spacemesh_v1::{ + post_service_operator_client::PostServiceOperatorClient, + post_service_status_response::Status, PostServiceStatusRequest, + }, + OperatorServer, + }, +}; + +#[allow(dead_code)] +mod server; +use server::TestServer; + +#[tokio::test] +async fn test_gen_proof_in_progress() { + // Initialize some data + let datadir = tempfile::tempdir().unwrap(); + + let cfg = post::config::Config { + k1: 8, + k2: 4, + k3: 4, + pow_difficulty: [0xFF; 32], + scrypt: ScryptParams::new(0, 0, 0), + }; + + CpuInitializer::new(cfg.scrypt) + .initialize(datadir.path(), &[0xBE; 32], &[0xCE; 32], 256, 4, 256, None) + .unwrap(); + + let pow_flags = RandomXFlag::get_recommended_flags(); + + let service = Arc::new( + post_service::service::PostService::new(datadir.into_path(), cfg, 16, 1, pow_flags) + .unwrap(), + ); + + let mut test_server = TestServer::new().await; + let client = test_server.create_client(service.clone()); + tokio::spawn(client.run()); + + // Create operator server and client + let listener = TcpListener::bind("localhost:0").await.unwrap(); + let operator_addr = format!("http://{}", listener.local_addr().unwrap()); + tokio::spawn(OperatorServer::run(listener, service)); + let mut client = PostServiceOperatorClient::connect(operator_addr) + .await + .unwrap(); + + // It starts in idle state + let response = client.status(PostServiceStatusRequest {}).await.unwrap(); + assert_eq!(response.into_inner().status(), Status::Idle); + + // It transforms to Proving when a proof generation starts + let connected = test_server.connected.recv().await.unwrap(); + TestServer::generate_proof(&connected, vec![0xCA; 32]).await; + let response = client.status(PostServiceStatusRequest {}).await.unwrap(); + assert_eq!(response.into_inner().status(), Status::Proving); + + loop { + let response = TestServer::generate_proof(&connected, vec![0xCA; 32]).await; + if let Some(service_response::Kind::GenProof(resp)) = response.kind { + match resp.status() { + GenProofStatus::Ok => { + if resp.proof.is_some() { + break; + } + } + _ => { + panic!("Got error response from GenProof"); + } + } + } else { + unreachable!(); + } + sleep(Duration::from_millis(10)).await; + } + + // It transforms back to Idle when the proof generation finishes + let response = client.status(PostServiceStatusRequest {}).await.unwrap(); + assert_eq!(response.into_inner().status(), Status::Idle); +} From c720d44f1f45bff1d43942741b8e9c37c7a3e94a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20R=C3=B3=C5=BCa=C5=84ski?= Date: Thu, 26 Oct 2023 10:54:37 +0200 Subject: [PATCH 2/7] Rename Operator trait to Service --- service/src/operator.rs | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/service/src/operator.rs b/service/src/operator.rs index e27a8226..3aa62603 100644 --- a/service/src/operator.rs +++ b/service/src/operator.rs @@ -31,19 +31,19 @@ pub trait Service { } #[derive(Debug, Default)] -pub struct OperatorService { - operator: Arc, +pub struct OperatorService { + service: Arc, } #[tonic::async_trait] -impl PostServiceOperator for OperatorService { +impl PostServiceOperator for OperatorService { async fn status( &self, request: Request, ) -> Result, Status> { log::debug!("got a request from {:?}", request.remote_addr()); - let status = match self.operator.status() { + let status = match self.service.status() { ServiceState::Idle => spacemesh_v1::post_service_status_response::Status::Idle, ServiceState::Proving => spacemesh_v1::post_service_status_response::Status::Proving, }; @@ -58,9 +58,9 @@ impl PostServiceOperator for OperatorService pub struct OperatorServer {} impl OperatorServer { - pub async fn run(listener: TcpListener, operator: Arc) -> eyre::Result<()> + pub async fn run(listener: TcpListener, service: Arc) -> eyre::Result<()> where - O: Service + Sync + Send + 'static, + S: Service + Sync + Send + 'static, { log::info!("running operator service on {}", listener.local_addr()?); @@ -68,7 +68,7 @@ impl OperatorServer { .register_encoded_file_descriptor_set(spacemesh_v1::FILE_DESCRIPTOR_SET) .build()?; - let operator_service = PostServiceOperatorServer::new(OperatorService { operator }); + let operator_service = PostServiceOperatorServer::new(OperatorService { service }); Server::builder() .add_service(reflection_service) @@ -91,20 +91,18 @@ mod tests { #[tokio::test] async fn test_status() { - let mut svc_operator = super::MockService::new(); - svc_operator - .expect_status() + let mut svc = super::MockService::new(); + svc.expect_status() .once() .returning(|| super::ServiceState::Idle); - svc_operator - .expect_status() + svc.expect_status() .once() .returning(|| super::ServiceState::Proving); let listener = TcpListener::bind("localhost:0").await.unwrap(); let addr: std::net::SocketAddr = listener.local_addr().unwrap(); - tokio::spawn(super::OperatorServer::run(listener, Arc::new(svc_operator))); + tokio::spawn(super::OperatorServer::run(listener, Arc::new(svc))); let mut client = PostServiceOperatorClient::connect(format!("http://{addr}")) .await From 862574a12fa01be395bcdddc3f4efed19633ffaf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20R=C3=B3=C5=BCa=C5=84ski?= Date: Wed, 1 Nov 2023 12:09:33 +0100 Subject: [PATCH 3/7] Update api --- service/api | 2 +- service/build.rs | 6 +++-- service/src/operator.rs | 40 ++++++++++++++++++---------------- service/tests/test_operator.rs | 16 ++++++-------- 4 files changed, 33 insertions(+), 31 deletions(-) diff --git a/service/api b/service/api index f2516cf2..8cdaf63d 160000 --- a/service/api +++ b/service/api @@ -1 +1 @@ -Subproject commit f2516cf212831b08fc1e9b84df6679d0eccf5b05 +Subproject commit 8cdaf63db88df84fb8aa40cb4ec5d52c53000561 diff --git a/service/build.rs b/service/build.rs index e66f81ca..a08fbf3c 100644 --- a/service/build.rs +++ b/service/build.rs @@ -1,10 +1,12 @@ use std::{env, path::PathBuf}; fn main() -> Result<(), Box> { + tonic_build::configure().compile(&["api/spacemesh/v1/post.proto"], &["api"])?; + let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap()); tonic_build::configure() - .file_descriptor_set_path(out_dir.join("post_descriptor.bin")) - .compile(&["api/spacemesh/v1/post.proto"], &["api"])?; + .file_descriptor_set_path(out_dir.join("service_descriptor.bin")) + .compile(&["api/post/v1/service.proto"], &["api"])?; Ok(()) } diff --git a/service/src/operator.rs b/service/src/operator.rs index 3aa62603..3edc563b 100644 --- a/service/src/operator.rs +++ b/service/src/operator.rs @@ -9,13 +9,13 @@ use tokio::net::TcpListener; use tokio_stream::wrappers::TcpListenerStream; use tonic::{transport::Server, Request, Response, Status}; -use spacemesh_v1::post_service_operator_server::{PostServiceOperator, PostServiceOperatorServer}; -use spacemesh_v1::{PostServiceStatusRequest, PostServiceStatusResponse}; +use post_v1::operator_service_server::OperatorServiceServer; +use post_v1::{OperatorStatusRequest, OperatorStatusResponse}; -pub mod spacemesh_v1 { - tonic::include_proto!("spacemesh.v1"); +pub mod post_v1 { + tonic::include_proto!("post.v1"); pub(crate) const FILE_DESCRIPTOR_SET: &[u8] = - tonic::include_file_descriptor_set!("post_descriptor"); + tonic::include_file_descriptor_set!("service_descriptor"); } pub enum ServiceState { @@ -36,19 +36,21 @@ pub struct OperatorService { } #[tonic::async_trait] -impl PostServiceOperator for OperatorService { +impl post_v1::operator_service_server::OperatorService + for OperatorService +{ async fn status( &self, - request: Request, - ) -> Result, Status> { + request: Request, + ) -> Result, Status> { log::debug!("got a request from {:?}", request.remote_addr()); let status = match self.service.status() { - ServiceState::Idle => spacemesh_v1::post_service_status_response::Status::Idle, - ServiceState::Proving => spacemesh_v1::post_service_status_response::Status::Proving, + ServiceState::Idle => post_v1::operator_status_response::Status::Idle, + ServiceState::Proving => post_v1::operator_status_response::Status::Proving, }; - Ok(Response::new(PostServiceStatusResponse { + Ok(Response::new(OperatorStatusResponse { status: status as _, })) } @@ -65,10 +67,10 @@ impl OperatorServer { log::info!("running operator service on {}", listener.local_addr()?); let reflection_service = tonic_reflection::server::Builder::configure() - .register_encoded_file_descriptor_set(spacemesh_v1::FILE_DESCRIPTOR_SET) + .register_encoded_file_descriptor_set(post_v1::FILE_DESCRIPTOR_SET) .build()?; - let operator_service = PostServiceOperatorServer::new(OperatorService { service }); + let operator_service = OperatorServiceServer::new(OperatorService { service }); Server::builder() .add_service(reflection_service) @@ -85,9 +87,9 @@ mod tests { use tokio::net::TcpListener; - use super::spacemesh_v1::post_service_operator_client::PostServiceOperatorClient; - use super::spacemesh_v1::post_service_status_response::Status; - use super::spacemesh_v1::PostServiceStatusRequest; + use super::post_v1::operator_service_client::OperatorServiceClient; + use super::post_v1::operator_status_response::Status; + use super::post_v1::OperatorStatusRequest; #[tokio::test] async fn test_status() { @@ -104,14 +106,14 @@ mod tests { tokio::spawn(super::OperatorServer::run(listener, Arc::new(svc))); - let mut client = PostServiceOperatorClient::connect(format!("http://{addr}")) + let mut client = OperatorServiceClient::connect(format!("http://{addr}")) .await .unwrap(); - let response = client.status(PostServiceStatusRequest {}).await.unwrap(); + let response = client.status(OperatorStatusRequest {}).await.unwrap(); assert_eq!(response.into_inner().status(), Status::Idle); - let response = client.status(PostServiceStatusRequest {}).await.unwrap(); + let response = client.status(OperatorStatusRequest {}).await.unwrap(); assert_eq!(response.into_inner().status(), Status::Proving); } } diff --git a/service/tests/test_operator.rs b/service/tests/test_operator.rs index e957f300..9d172a56 100644 --- a/service/tests/test_operator.rs +++ b/service/tests/test_operator.rs @@ -11,9 +11,9 @@ use post::{ use post_service::{ client::spacemesh_v1::{service_response, GenProofStatus}, operator::{ - spacemesh_v1::{ - post_service_operator_client::PostServiceOperatorClient, - post_service_status_response::Status, PostServiceStatusRequest, + post_v1::{ + operator_service_client::OperatorServiceClient, operator_status_response::Status, + OperatorStatusRequest, }, OperatorServer, }, @@ -55,18 +55,16 @@ async fn test_gen_proof_in_progress() { let listener = TcpListener::bind("localhost:0").await.unwrap(); let operator_addr = format!("http://{}", listener.local_addr().unwrap()); tokio::spawn(OperatorServer::run(listener, service)); - let mut client = PostServiceOperatorClient::connect(operator_addr) - .await - .unwrap(); + let mut client = OperatorServiceClient::connect(operator_addr).await.unwrap(); // It starts in idle state - let response = client.status(PostServiceStatusRequest {}).await.unwrap(); + let response = client.status(OperatorStatusRequest {}).await.unwrap(); assert_eq!(response.into_inner().status(), Status::Idle); // It transforms to Proving when a proof generation starts let connected = test_server.connected.recv().await.unwrap(); TestServer::generate_proof(&connected, vec![0xCA; 32]).await; - let response = client.status(PostServiceStatusRequest {}).await.unwrap(); + let response = client.status(OperatorStatusRequest {}).await.unwrap(); assert_eq!(response.into_inner().status(), Status::Proving); loop { @@ -89,6 +87,6 @@ async fn test_gen_proof_in_progress() { } // It transforms back to Idle when the proof generation finishes - let response = client.status(PostServiceStatusRequest {}).await.unwrap(); + let response = client.status(OperatorStatusRequest {}).await.unwrap(); assert_eq!(response.into_inner().status(), Status::Idle); } From b30c5aa6ff19b1d6da708e196d14d449ee7d0c83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20R=C3=B3=C5=BCa=C5=84ski?= Date: Wed, 1 Nov 2023 13:33:17 +0100 Subject: [PATCH 4/7] Update api to v1.24.0 --- service/api | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/api b/service/api index 8cdaf63d..10baa944 160000 --- a/service/api +++ b/service/api @@ -1 +1 @@ -Subproject commit 8cdaf63db88df84fb8aa40cb4ec5d52c53000561 +Subproject commit 10baa9442c3416da98e9da57be96c759d725c916 From a0cb3fd9ca21710447a8f29df970ae8247360999 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20R=C3=B3=C5=BCa=C5=84ski?= Date: Tue, 13 Feb 2024 12:35:26 +0100 Subject: [PATCH 5/7] Refactor operator API into HTTP --- Cargo.lock | 79 +++++++++++++-------------- service/Cargo.toml | 8 +-- service/build.rs | 7 --- service/src/main.rs | 2 +- service/src/operator.rs | 98 +++++++++++----------------------- service/tests/test_operator.rs | 32 +++++------ 6 files changed, 89 insertions(+), 137 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d7e1a79d..0b77a697 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2895,9 +2895,9 @@ checksum = "e898588f33fdd5b9420719948f9f2a32c922a246964576f71ba7f24f80610fbc" [[package]] name = "reqwest" -version = "0.11.23" +version = "0.11.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37b1ae8d9ac08420c66222fb9096fc5de435c3c48542bc5336c51892cffafb41" +checksum = "c6920094eb85afde5e4a138be3f2de8bbdf28000f0029e72c45025a56b042251" dependencies = [ "base64 0.21.7", "bytes", @@ -2917,9 +2917,11 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", + "rustls-pemfile 1.0.4", "serde", "serde_json", "serde_urlencoded", + "sync_wrapper", "system-configuration", "tokio", "tokio-native-tls", @@ -3087,14 +3089,16 @@ dependencies = [ [[package]] name = "rustls" -version = "0.21.10" +version = "0.22.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba" +checksum = "e87c9956bd9807afa1f77e0f7594af32566e830e088a5576d27c5b6f30f49d41" dependencies = [ "log", "ring 0.17.7", + "rustls-pki-types", "rustls-webpki", - "sct", + "subtle", + "zeroize", ] [[package]] @@ -3106,13 +3110,30 @@ dependencies = [ "base64 0.21.7", ] +[[package]] +name = "rustls-pemfile" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35e4980fa29e4c4b212ffb3db068a564cbf560e51d3944b7c88bd8bf5bec64f4" +dependencies = [ + "base64 0.21.7", + "rustls-pki-types", +] + +[[package]] +name = "rustls-pki-types" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a716eb65e3158e90e17cd93d855216e27bde02745ab842f2cab4a39dba1bacf" + [[package]] name = "rustls-webpki" -version = "0.101.7" +version = "0.102.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" +checksum = "faaa0a62740bedb9b2ef5afa303da42764c012f743917351dc9a237ea1663610" dependencies = [ "ring 0.17.7", + "rustls-pki-types", "untrusted 0.9.0", ] @@ -3194,16 +3215,6 @@ dependencies = [ "thiserror", ] -[[package]] -name = "sct" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" -dependencies = [ - "ring 0.17.7", - "untrusted 0.9.0", -] - [[package]] name = "secrecy" version = "0.8.0" @@ -3330,6 +3341,7 @@ name = "service" version = "0.7.0" dependencies = [ "async-stream", + "axum 0.7.4", "clap 4.4.18", "env_logger", "eyre", @@ -3339,14 +3351,15 @@ dependencies = [ "post-rs", "prost", "rcgen", + "reqwest", "rstest 0.18.2", + "serde", "sysinfo", "tempfile", "tokio", "tokio-stream", "tonic", "tonic-build", - "tonic-reflection", ] [[package]] @@ -3728,11 +3741,12 @@ dependencies = [ [[package]] name = "tokio-rustls" -version = "0.24.1" +version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" +checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" dependencies = [ "rustls", + "rustls-pki-types", "tokio", ] @@ -3789,9 +3803,9 @@ dependencies = [ [[package]] name = "tonic" -version = "0.10.2" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d560933a0de61cf715926b9cac824d4c883c2c43142f787595e48280c40a1d0e" +checksum = "76c4eb7a4e9ef9d4763600161f12f5070b92a578e1b634db88a6887844c91a13" dependencies = [ "async-stream", "async-trait", @@ -3806,8 +3820,8 @@ dependencies = [ "percent-encoding", "pin-project", "prost", - "rustls", - "rustls-pemfile", + "rustls-pemfile 2.0.0", + "rustls-pki-types", "tokio", "tokio-rustls", "tokio-stream", @@ -3819,9 +3833,9 @@ dependencies = [ [[package]] name = "tonic-build" -version = "0.10.2" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d021fc044c18582b9a2408cd0dd05b1596e3ecdb5c4df822bb0183545683889" +checksum = "be4ef6dd70a610078cb4e338a0f79d06bc759ff1b22d2120c2ff02ae264ba9c2" dependencies = [ "prettyplease", "proc-macro2", @@ -3830,19 +3844,6 @@ dependencies = [ "syn 2.0.48", ] -[[package]] -name = "tonic-reflection" -version = "0.10.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fa37c513df1339d197f4ba21d28c918b9ef1ac1768265f11ecb6b7f1cba1b76" -dependencies = [ - "prost", - "prost-types", - "tokio", - "tokio-stream", - "tonic", -] - [[package]] name = "tower" version = "0.4.13" diff --git a/service/Cargo.toml b/service/Cargo.toml index a607800f..e242b8fb 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -10,8 +10,7 @@ path = "src/lib.rs" [dependencies] post-rs = { path = "../" } prost = "0.12.1" -tonic = { version = "0.10.0", features = ["tls"] } -tonic-reflection = "0.10.2" +tonic = { version = "0.11.0", features = ["tls"] } tokio = { version = "1.0", features = [ "rt-multi-thread", "macros", @@ -27,11 +26,14 @@ clap = { version = "4.4.4", features = ["derive"] } hex = "0.4.3" mockall = "0.11.4" sysinfo = "0.29.10" +axum = "0.7.4" +serde = "1.0.196" [build-dependencies] -tonic-build = "0.10.0" +tonic-build = "0.11.0" [dev-dependencies] rcgen = "0.11.3" +reqwest = { version = "0.11.24", features = ["json"] } rstest = "0.18.2" tempfile = "3.8.0" diff --git a/service/build.rs b/service/build.rs index a08fbf3c..34df25cb 100644 --- a/service/build.rs +++ b/service/build.rs @@ -1,12 +1,5 @@ -use std::{env, path::PathBuf}; - fn main() -> Result<(), Box> { tonic_build::configure().compile(&["api/spacemesh/v1/post.proto"], &["api"])?; - let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap()); - tonic_build::configure() - .file_descriptor_set_path(out_dir.join("service_descriptor.bin")) - .compile(&["api/post/v1/service.proto"], &["api"])?; - Ok(()) } diff --git a/service/src/main.rs b/service/src/main.rs index 09df5998..bdb32b18 100644 --- a/service/src/main.rs +++ b/service/src/main.rs @@ -230,7 +230,7 @@ async fn main() -> eyre::Result<()> { if let Some(address) = args.operator_address { let listener = TcpListener::bind(address).await?; - tokio::spawn(operator::OperatorServer::run(listener, service.clone())); + tokio::spawn(operator::run(listener, service.clone())); } let client = client::ServiceClient::new(args.address, tls, service)?; diff --git a/service/src/operator.rs b/service/src/operator.rs index 3edc563b..914418fb 100644 --- a/service/src/operator.rs +++ b/service/src/operator.rs @@ -1,23 +1,15 @@ //! Operator service for controlling the post service. //! -//! It exposes a GRPC API defined in `spacemesh.v1.post.proto`. +//! It exposes an HTTP API. //! Allows to query the status of the post service. use std::sync::Arc; +use axum::{extract::State, routing::get, Json, Router}; +use serde::{Deserialize, Serialize}; use tokio::net::TcpListener; -use tokio_stream::wrappers::TcpListenerStream; -use tonic::{transport::Server, Request, Response, Status}; - -use post_v1::operator_service_server::OperatorServiceServer; -use post_v1::{OperatorStatusRequest, OperatorStatusResponse}; - -pub mod post_v1 { - tonic::include_proto!("post.v1"); - pub(crate) const FILE_DESCRIPTOR_SET: &[u8] = - tonic::include_file_descriptor_set!("service_descriptor"); -} +#[derive(Debug, Serialize, Deserialize)] pub enum ServiceState { Idle, Proving, @@ -30,55 +22,26 @@ pub trait Service { fn status(&self) -> ServiceState; } -#[derive(Debug, Default)] -pub struct OperatorService { - service: Arc, -} - -#[tonic::async_trait] -impl post_v1::operator_service_server::OperatorService - for OperatorService +pub async fn run(listener: TcpListener, service: Arc) -> eyre::Result<()> +where + S: Service + Sync + Send + 'static, { - async fn status( - &self, - request: Request, - ) -> Result, Status> { - log::debug!("got a request from {:?}", request.remote_addr()); + log::info!("running operator service on {}", listener.local_addr()?); - let status = match self.service.status() { - ServiceState::Idle => post_v1::operator_status_response::Status::Idle, - ServiceState::Proving => post_v1::operator_status_response::Status::Proving, - }; + let app = Router::new() + .route("/status", get(status)) + .with_state(service); - Ok(Response::new(OperatorStatusResponse { - status: status as _, - })) - } + axum::serve(listener, app) + .await + .map_err(|e| eyre::eyre!("failed to serve: {e}")) } -#[derive(Debug, Default)] -pub struct OperatorServer {} - -impl OperatorServer { - pub async fn run(listener: TcpListener, service: Arc) -> eyre::Result<()> - where - S: Service + Sync + Send + 'static, - { - log::info!("running operator service on {}", listener.local_addr()?); - - let reflection_service = tonic_reflection::server::Builder::configure() - .register_encoded_file_descriptor_set(post_v1::FILE_DESCRIPTOR_SET) - .build()?; - - let operator_service = OperatorServiceServer::new(OperatorService { service }); - - Server::builder() - .add_service(reflection_service) - .add_service(operator_service) - .serve_with_incoming(TcpListenerStream::new(listener)) - .await - .map_err(|e| eyre::eyre!("failed to serve: {e}")) - } +async fn status(State(service): State>) -> Json +where + S: Service + Sync + Send + 'static, +{ + Json(service.status()) } #[cfg(test)] @@ -87,10 +50,6 @@ mod tests { use tokio::net::TcpListener; - use super::post_v1::operator_service_client::OperatorServiceClient; - use super::post_v1::operator_status_response::Status; - use super::post_v1::OperatorStatusRequest; - #[tokio::test] async fn test_status() { let mut svc = super::MockService::new(); @@ -104,16 +63,19 @@ mod tests { let listener = TcpListener::bind("localhost:0").await.unwrap(); let addr: std::net::SocketAddr = listener.local_addr().unwrap(); - tokio::spawn(super::OperatorServer::run(listener, Arc::new(svc))); + tokio::spawn(super::run(listener, Arc::new(svc))); + + let url = format!("http://{addr}/status"); + let resp = reqwest::get(&url).await.unwrap(); + let status: super::ServiceState = resp.json().await.unwrap(); + assert!(matches!(status, super::ServiceState::Idle)); - let mut client = OperatorServiceClient::connect(format!("http://{addr}")) + let resp = reqwest::get(&url) .await + .unwrap() + .error_for_status() .unwrap(); - - let response = client.status(OperatorStatusRequest {}).await.unwrap(); - assert_eq!(response.into_inner().status(), Status::Idle); - - let response = client.status(OperatorStatusRequest {}).await.unwrap(); - assert_eq!(response.into_inner().status(), Status::Proving); + let status: super::ServiceState = resp.json().await.unwrap(); + assert!(matches!(status, super::ServiceState::Proving)); } } diff --git a/service/tests/test_operator.rs b/service/tests/test_operator.rs index bc64b12f..2a5a919c 100644 --- a/service/tests/test_operator.rs +++ b/service/tests/test_operator.rs @@ -1,22 +1,14 @@ use core::{panic, time}; use std::{sync::Arc, time::Duration}; +use post_service::operator; use tokio::{net::TcpListener, time::sleep}; use post::{ initialize::{CpuInitializer, Initialize}, pow::randomx::RandomXFlag, }; -use post_service::{ - client::spacemesh_v1::{service_response, GenProofStatus}, - operator::{ - post_v1::{ - operator_service_client::OperatorServiceClient, operator_status_response::Status, - OperatorStatusRequest, - }, - OperatorServer, - }, -}; +use post_service::client::spacemesh_v1::{service_response, GenProofStatus}; #[allow(dead_code)] mod server; @@ -30,7 +22,6 @@ async fn test_gen_proof_in_progress() { let cfg = post::config::ProofConfig { k1: 8, k2: 4, - k3: 4, pow_difficulty: [0xFF; 32], }; let init_cfg = post::config::InitConfig { @@ -73,18 +64,20 @@ async fn test_gen_proof_in_progress() { // Create operator server and client let listener = TcpListener::bind("localhost:0").await.unwrap(); let operator_addr = format!("http://{}", listener.local_addr().unwrap()); - tokio::spawn(OperatorServer::run(listener, service)); - let mut client = OperatorServiceClient::connect(operator_addr).await.unwrap(); + tokio::spawn(operator::run(listener, service)); + let status_url = format!("{operator_addr}/status"); + let resp = reqwest::get(&status_url).await.unwrap(); + let status = resp.json().await.unwrap(); // It starts in idle state - let response = client.status(OperatorStatusRequest {}).await.unwrap(); - assert_eq!(response.into_inner().status(), Status::Idle); + assert!(matches!(status, operator::ServiceState::Idle)); // It transforms to Proving when a proof generation starts let connected = test_server.connected.recv().await.unwrap(); TestServer::generate_proof(&connected, vec![0xCA; 32]).await; - let response = client.status(OperatorStatusRequest {}).await.unwrap(); - assert_eq!(response.into_inner().status(), Status::Proving); + let resp = reqwest::get(&status_url).await.unwrap(); + let status = resp.json().await.unwrap(); + assert!(matches!(status, operator::ServiceState::Proving)); loop { let response = TestServer::generate_proof(&connected, vec![0xCA; 32]).await; @@ -106,6 +99,7 @@ async fn test_gen_proof_in_progress() { } // It transforms back to Idle when the proof generation finishes - let response = client.status(OperatorStatusRequest {}).await.unwrap(); - assert_eq!(response.into_inner().status(), Status::Idle); + let resp = reqwest::get(&status_url).await.unwrap(); + let status = resp.json().await.unwrap(); + assert!(matches!(status, operator::ServiceState::Idle)); } From 3ca7969679c5abeb108ac14b91ba8500fd340994 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20R=C3=B3=C5=BCa=C5=84ski?= Date: Wed, 14 Feb 2024 17:45:32 +0100 Subject: [PATCH 6/7] Report proving progress on operator /status endpoint --- Cargo.lock | 11 ++ benches/verifying.rs | 14 ++- certifier/tests/test_certify.rs | 14 ++- ffi/src/post_impl.rs | 11 +- service/Cargo.toml | 1 + service/src/operator.rs | 26 +++- service/src/service.rs | 202 +++++++++++++++++++++++--------- service/tests/test_operator.rs | 29 +++-- src/prove.rs | 66 +++++++---- tests/generate_and_verify.rs | 30 ++++- 10 files changed, 295 insertions(+), 109 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0b77a697..88b12d5c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2793,6 +2793,16 @@ dependencies = [ "thiserror", ] +[[package]] +name = "range-set" +version = "0.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "714bc4849c399f77ab82177e9b4f012e02f3a7d6191023e016334edde5b21050" +dependencies = [ + "num-traits 0.2.17", + "smallvec", +] + [[package]] name = "raw-cpuid" version = "10.7.0" @@ -3350,6 +3360,7 @@ dependencies = [ "mockall", "post-rs", "prost", + "range-set", "rcgen", "reqwest", "rstest 0.18.2", diff --git a/benches/verifying.rs b/benches/verifying.rs index dc2ee48d..d68def8b 100644 --- a/benches/verifying.rs +++ b/benches/verifying.rs @@ -6,7 +6,7 @@ use post::{ initialize::{CpuInitializer, Initialize}, metadata::ProofMetadata, pow::randomx::{PoW, RandomXFlag}, - prove::generate_proof, + prove::{generate_proof, NoopProgressReporter}, verification::{Mode, Verifier}, }; #[cfg(not(windows))] @@ -44,7 +44,17 @@ fn verifying(c: &mut Criterion) { let pow_flags = RandomXFlag::get_recommended_flags(); // Generate a proof let stop = AtomicBool::new(false); - let proof = generate_proof(datadir.path(), challenge, cfg, 32, 1, pow_flags, stop).unwrap(); + let proof = generate_proof( + datadir.path(), + challenge, + cfg, + 32, + 1, + pow_flags, + stop, + NoopProgressReporter {}, + ) + .unwrap(); let metadata = ProofMetadata::new(metadata, *challenge); // Bench verifying the proof diff --git a/certifier/tests/test_certify.rs b/certifier/tests/test_certify.rs index 19732d02..67a0acb0 100644 --- a/certifier/tests/test_certify.rs +++ b/certifier/tests/test_certify.rs @@ -7,7 +7,7 @@ use post::{ initialize::{CpuInitializer, Initialize}, metadata::ProofMetadata, pow::randomx::RandomXFlag, - prove::generate_proof, + prove::{self, generate_proof}, }; use reqwest::StatusCode; use tokio::net::TcpListener; @@ -45,7 +45,17 @@ async fn test_certificate_post_proof() { // Generate a proof let pow_flags = RandomXFlag::get_recommended_flags(); let stop = AtomicBool::new(false); - let proof = generate_proof(datadir.path(), challenge, cfg, 32, 1, pow_flags, stop).unwrap(); + let proof = generate_proof( + datadir.path(), + challenge, + cfg, + 32, + 1, + pow_flags, + stop, + prove::NoopProgressReporter {}, + ) + .unwrap(); let metadata = ProofMetadata::new(metadata, *challenge); // Spawn the certifier service diff --git a/ffi/src/post_impl.rs b/ffi/src/post_impl.rs index bac634cf..480e8f0d 100644 --- a/ffi/src/post_impl.rs +++ b/ffi/src/post_impl.rs @@ -103,7 +103,16 @@ fn _generate_proof( let challenge = challenge.try_into()?; let stop = AtomicBool::new(false); - let proof = prove::generate_proof(datadir, challenge, cfg, nonces, threads, pow_flags, stop)?; + let proof = prove::generate_proof( + datadir, + challenge, + cfg, + nonces, + threads, + pow_flags, + stop, + prove::NoopProgressReporter {}, + )?; Ok(Box::new(Proof::from(proof))) } diff --git a/service/Cargo.toml b/service/Cargo.toml index e242b8fb..13e4dce6 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -28,6 +28,7 @@ mockall = "0.11.4" sysinfo = "0.29.10" axum = "0.7.4" serde = "1.0.196" +range-set = "0.0.11" [build-dependencies] tonic-build = "0.11.0" diff --git a/service/src/operator.rs b/service/src/operator.rs index 914418fb..42a55ef5 100644 --- a/service/src/operator.rs +++ b/service/src/operator.rs @@ -3,16 +3,26 @@ //! It exposes an HTTP API. //! Allows to query the status of the post service. -use std::sync::Arc; +use std::{ops::Range, sync::Arc}; use axum::{extract::State, routing::get, Json, Router}; use serde::{Deserialize, Serialize}; use tokio::net::TcpListener; -#[derive(Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +/// The Post-service state pub enum ServiceState { + /// The service is idle. Idle, - Proving, + /// The service is currently proving. + Proving { + /// The range of nonces being proven in the current data pass. + nonces: Range, + /// The position (in bytes) in the POST data that is already checked. + position: u64, + }, + /// Finished proving, but the proof has not been fetched yet. + DoneProving, } #[mockall::automock] @@ -56,9 +66,13 @@ mod tests { svc.expect_status() .once() .returning(|| super::ServiceState::Idle); + let proving_status = super::ServiceState::Proving { + nonces: 0..64, + position: 1000, + }; svc.expect_status() .once() - .returning(|| super::ServiceState::Proving); + .return_const(proving_status.clone()); let listener = TcpListener::bind("localhost:0").await.unwrap(); let addr: std::net::SocketAddr = listener.local_addr().unwrap(); @@ -75,7 +89,7 @@ mod tests { .unwrap() .error_for_status() .unwrap(); - let status: super::ServiceState = resp.json().await.unwrap(); - assert!(matches!(status, super::ServiceState::Proving)); + + assert_eq!(proving_status, resp.json().await.unwrap()); } } diff --git a/service/src/service.rs b/service/src/service.rs index cacbcc94..90c20943 100644 --- a/service/src/service.rs +++ b/service/src/service.rs @@ -1,6 +1,7 @@ //! Post Service use std::{ + ops::{Range, RangeInclusive}, path::PathBuf, sync::{atomic::AtomicBool, Arc, Mutex}, }; @@ -9,10 +10,12 @@ use eyre::Context; use post::{ metadata::{PostMetadata, ProofMetadata}, pow::randomx::{PoW, RandomXFlag}, - prove::Proof, + prove::{self, Proof}, verification::{Mode, Verifier}, }; +use crate::operator::ServiceState; + #[derive(Debug)] pub enum ProofGenState { InProgress, @@ -20,9 +23,87 @@ pub enum ProofGenState { } #[derive(Debug)] -struct ProofGenProcess { - handle: std::thread::JoinHandle>>, - challenge: Vec, +enum ProofGenProcess { + Idle, + Running { + handle: Option>>>, + challenge: Vec, + progress: ProvingProgress, + }, + Done { + proof: eyre::Result>, + }, +} + +impl ProofGenProcess { + fn check_finished(&mut self) { + if let ProofGenProcess::Running { handle, .. } = self { + if handle.as_ref().unwrap().is_finished() { + let proof = match handle.take().unwrap().join() { + Ok(result) => result, + Err(err) => { + std::panic::resume_unwind(err); + } + }; + *self = ProofGenProcess::Done { proof }; + } + } + } +} + +#[derive(Clone, Debug, Default)] +struct ProvingProgress { + inner: Arc>, +} + +#[derive(Clone, Debug)] +struct ProvingProgressInner { + nonces: std::ops::Range, + chunks: range_set::RangeSet<[RangeInclusive; 10]>, +} + +impl Default for ProvingProgressInner { + fn default() -> Self { + Self { + nonces: 0..0, + chunks: range_set::RangeSet::new(), + } + } +} + +impl prove::ProgressReporter for ProvingProgress { + fn finished_chunk(&self, pos: u64, len: usize) { + if len == 0 { + return; + } + + self.inner + .lock() + .unwrap() + .chunks + .insert_range(pos..=(pos + len as u64 - 1)); + } + + fn new_nonce_group(&self, nonces: std::ops::Range) { + let mut progress = self.inner.lock().unwrap(); + progress.nonces = nonces; + progress.chunks.clear(); + } +} + +impl ProvingProgress { + fn get(&self) -> (Range, u64) { + let progress = self.inner.lock().unwrap(); + ( + progress.nonces.clone(), + progress + .chunks + .as_ref() + .iter() + .next() + .map_or(0, |r| *r.end() + 1), + ) + } } pub struct PostService { @@ -32,7 +113,7 @@ pub struct PostService { nonces: usize, threads: usize, pow_flags: RandomXFlag, - proof_generation: Mutex>, + proof_generation: Mutex, stop: Arc, } @@ -47,7 +128,7 @@ impl PostService { pow_flags: RandomXFlag, ) -> eyre::Result { Ok(Self { - proof_generation: Mutex::new(None), + proof_generation: Mutex::new(ProofGenProcess::Idle), datadir, cfg, init_cfg, @@ -60,55 +141,54 @@ impl PostService { } impl crate::client::PostService for PostService { - fn gen_proof(&self, challenge: Vec) -> eyre::Result { + fn gen_proof(&self, ch: Vec) -> eyre::Result { let mut proof_gen = self.proof_generation.lock().unwrap(); - if let Some(process) = proof_gen.as_mut() { - eyre::ensure!( - process.challenge == challenge, - "proof generation is in progress for a different challenge (current: {:X?}, requested: {:X?})", process.challenge, challenge, + proof_gen.check_finished(); + match &*proof_gen { + ProofGenProcess::Running { challenge, .. } => { + eyre::ensure!( + challenge == &ch, + "proof generation is in progress for a different challenge (current: {:X?}, requested: {:X?})", challenge, ch, ); - - if process.handle.is_finished() { + return Ok(ProofGenState::InProgress); + } + ProofGenProcess::Idle => { + let challenge: [u8; 32] = ch + .as_slice() + .try_into() + .map_err(|_| eyre::eyre!("invalid challenge format"))?; + log::info!("starting proof generation for challenge {challenge:X?}"); + let pow_flags = self.pow_flags; + let cfg = self.cfg; + let datadir = self.datadir.clone(); + let nonces = self.nonces; + let threads = self.threads; + let stop = self.stop.clone(); + let progress = ProvingProgress::default(); + let reporter = progress.clone(); + *proof_gen = ProofGenProcess::Running { + challenge: ch, + handle: Some(std::thread::spawn(move || { + post::prove::generate_proof( + &datadir, &challenge, cfg, nonces, threads, pow_flags, stop, reporter, + ) + })), + progress, + }; + } + ProofGenProcess::Done { proof } => { log::info!("proof generation is finished"); - let result = match proof_gen.take().unwrap().handle.join() { - Ok(result) => result, - Err(err) => { - std::panic::resume_unwind(err); - } + let result = match proof { + Ok(proof) => Ok(ProofGenState::Finished { + proof: proof.clone(), + }), + Err(e) => Err(eyre::eyre!("proof generation failed: {}", e)), }; - - match result { - Ok(proof) => { - return Ok(ProofGenState::Finished { proof }); - } - Err(e) => { - return Err(e); - } - } - } else { - log::info!("proof generation in progress"); - return Ok(ProofGenState::InProgress); + *proof_gen = ProofGenProcess::Idle; + return result; } } - let ch: [u8; 32] = challenge - .as_slice() - .try_into() - .map_err(|_| eyre::eyre!("invalid challenge format"))?; - log::info!("starting proof generation for challenge {ch:X?}"); - let pow_flags = self.pow_flags; - let cfg = self.cfg; - let datadir = self.datadir.clone(); - let nonces = self.nonces; - let threads = self.threads; - let stop = self.stop.clone(); - *proof_gen = Some(ProofGenProcess { - challenge, - handle: std::thread::spawn(move || { - post::prove::generate_proof(&datadir, &ch, cfg, nonces, threads, pow_flags, stop) - }), - }); - Ok(ProofGenState::InProgress) } @@ -127,12 +207,19 @@ impl crate::client::PostService for PostService { } impl crate::operator::Service for PostService { - fn status(&self) -> crate::operator::ServiceState { - let proof_gen = self.proof_generation.lock().unwrap(); - if proof_gen.as_ref().is_some() { - crate::operator::ServiceState::Proving - } else { - crate::operator::ServiceState::Idle + fn status(&self) -> ServiceState { + let mut proof_gen = self.proof_generation.lock().unwrap(); + proof_gen.check_finished(); + match &*proof_gen { + ProofGenProcess::Running { progress, .. } => { + let (nonces, offset) = progress.get(); + ServiceState::Proving { + nonces, + position: offset, + } + } + ProofGenProcess::Idle => ServiceState::Idle, + ProofGenProcess::Done { .. } => ServiceState::DoneProving, } } } @@ -140,10 +227,11 @@ impl crate::operator::Service for PostService { impl Drop for PostService { fn drop(&mut self) { log::info!("shutting down post service"); - if let Some(process) = self.proof_generation.lock().unwrap().take() { - log::debug!("killing proof generation process"); + if let ProofGenProcess::Running { handle, .. } = &mut *self.proof_generation.lock().unwrap() + { + log::debug!("stopping proof generation process"); self.stop.store(true, std::sync::atomic::Ordering::Relaxed); - let _ = process.handle.join().unwrap(); + let _ = handle.take().unwrap().join().unwrap(); log::debug!("proof generation process exited"); } } diff --git a/service/tests/test_operator.rs b/service/tests/test_operator.rs index 2a5a919c..ee922c36 100644 --- a/service/tests/test_operator.rs +++ b/service/tests/test_operator.rs @@ -1,7 +1,7 @@ use core::{panic, time}; use std::{sync::Arc, time::Duration}; -use post_service::operator; +use post_service::operator::{self, ServiceState}; use tokio::{net::TcpListener, time::sleep}; use post::{ @@ -21,13 +21,13 @@ async fn test_gen_proof_in_progress() { let cfg = post::config::ProofConfig { k1: 8, - k2: 4, + k2: 12, pow_difficulty: [0xFF; 32], }; let init_cfg = post::config::InitConfig { min_num_units: 1, max_num_units: 100, - labels_per_unit: 256, + labels_per_unit: 2560, scrypt: post::config::ScryptParams::new(2, 1, 1), }; @@ -70,36 +70,35 @@ async fn test_gen_proof_in_progress() { let resp = reqwest::get(&status_url).await.unwrap(); let status = resp.json().await.unwrap(); // It starts in idle state - assert!(matches!(status, operator::ServiceState::Idle)); + assert!(matches!(status, ServiceState::Idle)); // It transforms to Proving when a proof generation starts let connected = test_server.connected.recv().await.unwrap(); - TestServer::generate_proof(&connected, vec![0xCA; 32]).await; - let resp = reqwest::get(&status_url).await.unwrap(); - let status = resp.json().await.unwrap(); - assert!(matches!(status, operator::ServiceState::Proving)); loop { let response = TestServer::generate_proof(&connected, vec![0xCA; 32]).await; + let status_resp = reqwest::get(&status_url).await.unwrap(); + let status = status_resp.json().await.unwrap(); + if let Some(service_response::Kind::GenProof(resp)) = response.kind { match resp.status() { GenProofStatus::Ok => { if resp.proof.is_some() { + assert!(matches!(status, ServiceState::Idle)); break; } + assert!(matches!( + status, + ServiceState::Proving { .. } | ServiceState::DoneProving + )); } _ => { - panic!("Got error response from GenProof"); + panic!("got error response"); } } } else { - unreachable!(); + panic!("got wrong response kind"); } sleep(Duration::from_millis(10)).await; } - - // It transforms back to Idle when the proof generation finishes - let resp = reqwest::get(&status_url).await.unwrap(); - let status = resp.json().await.unwrap(); - assert!(matches!(status, operator::ServiceState::Idle)); } diff --git a/src/prove.rs b/src/prove.rs index 338e516b..f232da32 100644 --- a/src/prove.rs +++ b/src/prove.rs @@ -9,6 +9,7 @@ //! TODO: explain use std::borrow::{Borrow, Cow}; + use std::sync::{ atomic::{AtomicBool, Ordering}, Mutex, @@ -18,6 +19,7 @@ use std::{collections::HashMap, ops::Range, path::Path, time::Instant}; use aes::cipher::block_padding::NoPadding; use aes::cipher::BlockEncrypt; use eyre::Context; +use mockall::automock; use primitive_types::U256; use randomx_rs::RandomXFlag; use rayon::prelude::{ParallelBridge, ParallelIterator}; @@ -77,6 +79,19 @@ impl ProvingParams { } } +#[automock] +pub trait ProgressReporter { + fn new_nonce_group(&self, nonces: Range); + fn finished_chunk(&self, position: u64, len: usize); +} + +pub struct NoopProgressReporter {} + +impl ProgressReporter for NoopProgressReporter { + fn new_nonce_group(&self, _: Range) {} + fn finished_chunk(&self, _: u64, _: usize) {} +} + pub trait Prover { fn prove(&self, batch: &[u8], index: u64, consume: F) -> Option<(u32, Vec)> where @@ -265,17 +280,19 @@ impl Prover for Prover8_56 { /// Generate a proof that data is still held, given the challenge. #[allow(clippy::too_many_arguments)] -pub fn generate_proof( +pub fn generate_proof( datadir: &Path, challenge: &[u8; 32], cfg: ProofConfig, - nonces: usize, + nonces_size: usize, threads: usize, pow_flags: RandomXFlag, stop: Stopper, + reporter: Reporter, ) -> eyre::Result> where Stopper: Borrow, + Reporter: ProgressReporter + Send + Sync, { let stop = stop.borrow(); let metadata = metadata::load(datadir).wrap_err("loading metadata")?; @@ -283,8 +300,7 @@ where log::info!("generating proof with PoW flags: {pow_flags:?} and params: {params:?}"); let pow_prover = pow::randomx::PoW::new(pow_flags)?; - let mut start_nonce = 0; - let mut end_nonce = start_nonce + nonces as u32; + let mut nonces = 0..nonces_size as u32; let pool = rayon::ThreadPoolBuilder::new() .num_threads(threads) @@ -296,33 +312,30 @@ where if stop.load(Ordering::Relaxed) { eyre::bail!("proof generation was stopped"); } + reporter.new_nonce_group(nonces.clone()); let indexes = Mutex::new(HashMap::>::new()); let pow_time = Instant::now(); let prover = pool.install(|| { - Prover8_56::new( - challenge, - start_nonce..end_nonce, - params, - &pow_prover, - &metadata.node_id, - ) - .wrap_err("creating prover") + let miner_id = &metadata.node_id; + Prover8_56::new(challenge, nonces.clone(), params, &pow_prover, miner_id) + .wrap_err("creating prover") })?; - let pow_mins = pow_time.elapsed().as_secs() / 60; - log::info!("Finished k2pow in {} minutes", pow_mins); + let pow_secs = pow_time.elapsed().as_secs(); + let pow_mins = pow_secs / 60; + log::info!("finished k2pow in {pow_mins}m {}s", pow_secs % 60); let read_time = Instant::now(); let data_reader = read_data(datadir, 1024 * 1024, metadata.max_file_size)?; - log::info!("Started reading POST data"); + log::info!("started reading POST data"); let result = pool.install(|| { data_reader .par_bridge() .take_any_while(|_| !stop.load(Ordering::Relaxed)) .find_map_any(|batch| { - prover.prove( + let res = prover.prove( &batch.data, batch.pos / BLOCK_SIZE as u64, |nonce, index| { @@ -334,24 +347,31 @@ where } None }, - ) + ); + reporter.finished_chunk(batch.pos, batch.data.len()); + + res }) }); - - let read_mins = read_time.elapsed().as_secs() / 60; - log::info!("Finished reading POST data in {} minutes", read_mins); + let read_secs = read_time.elapsed().as_secs(); + let read_mins = read_secs / 60; + log::info!( + "finished reading POST data in {read_mins}m {}s", + read_secs % 60 + ); if let Some((nonce, indices)) = result { let num_labels = metadata.num_units as u64 * metadata.labels_per_unit; let pow = prover.get_pow(nonce).unwrap(); - let total_minutes = total_time.elapsed().as_secs() / 60; + let total_secs = total_time.elapsed().as_secs(); + let total_mins = total_secs / 60; - log::info!("Found proof for nonce: {nonce}, pow: {pow} with {indices:?} indices. Proof took {total_minutes} minutes"); + log::info!("found proof for nonce: {nonce}, pow: {pow} with {indices:?} indices. It took {total_mins}m {}s", total_secs % 60); return Ok(Proof::new(nonce, &indices, num_labels, pow)); } - (start_nonce, end_nonce) = (end_nonce, end_nonce + nonces as u32); + nonces = nonces.end..(nonces.end + nonces_size as u32); } } diff --git a/tests/generate_and_verify.rs b/tests/generate_and_verify.rs index 9c2bfcdd..350466be 100644 --- a/tests/generate_and_verify.rs +++ b/tests/generate_and_verify.rs @@ -6,7 +6,7 @@ use post::{ initialize::{CpuInitializer, Initialize}, metadata::ProofMetadata, pow::randomx::{PoW, RandomXFlag}, - prove::{generate_proof, Proof}, + prove::{self, generate_proof, Proof}, verification::{Error, Mode, Verifier}, }; use tempfile::tempdir; @@ -44,7 +44,21 @@ fn test_generate_and_verify() { let pow_flags = RandomXFlag::get_recommended_flags(); // Generate a proof let stop = AtomicBool::new(false); - let proof = generate_proof(datadir.path(), challenge, cfg, 32, 1, pow_flags, stop).unwrap(); + let mut reporter = prove::MockProgressReporter::new(); + reporter.expect_new_nonce_group().once().return_const(()); + reporter.expect_finished_chunk().times(1..).return_const(()); + + let proof = generate_proof( + datadir.path(), + challenge, + cfg, + 32, + 1, + pow_flags, + stop, + reporter, + ) + .unwrap(); // Verify the proof let metadata = ProofMetadata::new(metadata, *challenge); @@ -132,7 +146,17 @@ fn test_generate_and_verify_difficulty_msb_not_zero() { let pow_flags = RandomXFlag::get_recommended_flags(); // Generate a proof let stop = AtomicBool::new(false); - let proof = generate_proof(datadir.path(), challenge, cfg, 32, 1, pow_flags, stop).unwrap(); + let proof = generate_proof( + datadir.path(), + challenge, + cfg, + 32, + 1, + pow_flags, + stop, + prove::NoopProgressReporter {}, + ) + .unwrap(); // Verify the proof let metadata = ProofMetadata::new(metadata, *challenge); From 49faffcf8109fea7d6ed14a2a0cfc0ec5da96b1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20R=C3=B3=C5=BCa=C5=84ski?= Date: Wed, 14 Feb 2024 18:01:10 +0100 Subject: [PATCH 7/7] Clean ups --- service/README.md | 33 +++++++++++++++++++++++++-------- service/build.rs | 1 - service/src/operator.rs | 12 +++--------- service/src/service.rs | 19 +++++++------------ 4 files changed, 35 insertions(+), 30 deletions(-) diff --git a/service/README.md b/service/README.md index 707e6203..96b90aaf 100644 --- a/service/README.md +++ b/service/README.md @@ -21,17 +21,34 @@ service --help ``` ## Operator API -The operator API is a set of GRPC endpoints allowing control of the post service. - -The GRPC API supports reflection for easy use with tools like [grpcurl](https://github.com/fullstorydev/grpcurl). +The operator API is a set of HTTP endpoints allowing control of the post service. It is enabled by providing `--operator-address=
`, i.e. `--operator-address=127.0.0.1:50051` CLI argument. ### Example usage #### Querying post service status ```sh -❯ grpcurl -plaintext localhost:50051 spacemesh.v1.PostServiceOperator/Status -{ - "status": "IDLE" -} -``` \ No newline at end of file +# Not doing anything +❯ curl http://localhost:50051/status +"Idle" + +# Proving +❯ curl http://localhost:50051/status +{"Proving":{"nonces":{"start":0,"end":128},"position":0}} + +# Proving, read some data already +❯ curl http://localhost:50051/status +{"Proving":{"nonces":{"start":0,"end":128},"position":10000}} + +# Started second pass +❯ curl http://localhost:50051/status +{"Proving":{"nonces":{"start":128,"end":256},"position":10000}} + +# Finished proving, but the node has not fetched the proof yet +❯ curl http://localhost:50051/status +"DoneProving" + +# Finished proving and the node has fetched the proof +❯ curl http://localhost:50051/status +"Idle" +``` diff --git a/service/build.rs b/service/build.rs index 34df25cb..b4eb8a0a 100644 --- a/service/build.rs +++ b/service/build.rs @@ -1,5 +1,4 @@ fn main() -> Result<(), Box> { tonic_build::configure().compile(&["api/spacemesh/v1/post.proto"], &["api"])?; - Ok(()) } diff --git a/service/src/operator.rs b/service/src/operator.rs index 42a55ef5..800ca5c1 100644 --- a/service/src/operator.rs +++ b/service/src/operator.rs @@ -76,20 +76,14 @@ mod tests { let listener = TcpListener::bind("localhost:0").await.unwrap(); let addr: std::net::SocketAddr = listener.local_addr().unwrap(); + let url = format!("http://{addr}/status"); tokio::spawn(super::run(listener, Arc::new(svc))); - let url = format!("http://{addr}/status"); let resp = reqwest::get(&url).await.unwrap(); - let status: super::ServiceState = resp.json().await.unwrap(); - assert!(matches!(status, super::ServiceState::Idle)); - - let resp = reqwest::get(&url) - .await - .unwrap() - .error_for_status() - .unwrap(); + assert_eq!(super::ServiceState::Idle, resp.json().await.unwrap()); + let resp = reqwest::get(&url).await.unwrap(); assert_eq!(proving_status, resp.json().await.unwrap()); } } diff --git a/service/src/service.rs b/service/src/service.rs index 90c20943..b6b64157 100644 --- a/service/src/service.rs +++ b/service/src/service.rs @@ -58,8 +58,11 @@ struct ProvingProgress { #[derive(Clone, Debug)] struct ProvingProgressInner { + // currently processed nonces nonces: std::ops::Range, - chunks: range_set::RangeSet<[RangeInclusive; 10]>, + // already finished chunks of data + // the chunks are automatically merged when possible + chunks: range_set::RangeSet<[RangeInclusive; 20]>, } impl Default for ProvingProgressInner { @@ -77,11 +80,8 @@ impl prove::ProgressReporter for ProvingProgress { return; } - self.inner - .lock() - .unwrap() - .chunks - .insert_range(pos..=(pos + len as u64 - 1)); + let range = pos..=(pos + len as u64 - 1); + self.inner.lock().unwrap().chunks.insert_range(range); } fn new_nonce_group(&self, nonces: std::ops::Range) { @@ -96,12 +96,7 @@ impl ProvingProgress { let progress = self.inner.lock().unwrap(); ( progress.nonces.clone(), - progress - .chunks - .as_ref() - .iter() - .next() - .map_or(0, |r| *r.end() + 1), + progress.chunks.as_ref().first().map_or(0, |r| *r.end() + 1), ) } }