diff --git a/Cargo.lock b/Cargo.lock index 49dfcc3d..88ecc3d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -160,9 +160,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.73" +version = "0.1.74" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" +checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9" dependencies = [ "proc-macro2", "quote", @@ -2508,6 +2508,7 @@ dependencies = [ "tokio-stream", "tonic", "tonic-build", + "tonic-reflection", ] [[package]] @@ -2887,6 +2888,19 @@ dependencies = [ "syn 2.0.29", ] +[[package]] +name = "tonic-reflection" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fa37c513df1339d197f4ba21d28c918b9ef1ac1768265f11ecb6b7f1cba1b76" +dependencies = [ + "prost", + "prost-types", + "tokio", + "tokio-stream", + "tonic", +] + [[package]] name = "tower" version = "0.4.13" diff --git a/README.md b/README.md index be17be2b..eea299b6 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,9 @@ Includes: ### Randomx-rs [RandomX](https://github.com/tevador/randomx), that [randomx-rs](https://github.com/spacemeshos/randomx-rs) depends on, requires **cmake**. Follow [these instructions](https://github.com/spacemeshos/randomx-rs#build-dependencies) to install it. +## Post Service +Please refer to [service README](service/README.md) for instructions. + ## Troubleshooting ### Crash on Mac arm64 RandomX is known to misbehave, or even crash on arm64 Macs when using JIT. See this issue for details: https://github.com/tevador/RandomX/issues/262. diff --git a/service/Cargo.toml b/service/Cargo.toml index 6bbbd05d..bc144590 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -11,6 +11,7 @@ path = "src/lib.rs" post-rs = { path = "../" } prost = "0.12.1" tonic = { version = "0.10.0", features = ["tls"] } +tonic-reflection = "0.10.2" tokio = { version = "1.0", features = [ "rt-multi-thread", "macros", diff --git a/service/README.md b/service/README.md new file mode 100644 index 00000000..707e6203 --- /dev/null +++ b/service/README.md @@ -0,0 +1,37 @@ +# Post Service +Post service allows to separate expensive PoST proving from a node by allowing to generate a proof on a different machine. It connects to the node via GRPC (on an address pointed by `--address`) and awaits commands from the node. + +## How to run +First of all, the service currently doesn't support initializing PoST data. The data must be initialized separately (presumably using [postcli](https://github.com/spacemeshos/post/tree/develop/cmd/postcli) and placed in a directory pointed to by `--dir`). + +#### Example running on an un-encrypted channel, with the default configuration of _threads_ and _nonces_ +```sh +service --address=http://my-node-address.org --dir=./post-data +``` + +#### Example running on an encrypted (mTLS) channel, with the custom _threads_ and _nonces_ +For mTLS, you need to pass the certificate and private key of the post-service with `--cert` and `-key`, as well as the CA of the server with `--ca-cert`: +```sh +service --address=https://my-node-address.org --cert=client.pem --key=client-key.pem --ca-cert=server-rootCA.pem --dir=./post-data --threads=8 --nonces=288 +``` + +A full usage/help can be viewed with +```sh +service --help +``` + +## Operator API +The operator API is a set of GRPC endpoints allowing control of the post service. + +The GRPC API supports reflection for easy use with tools like [grpcurl](https://github.com/fullstorydev/grpcurl). + +It is enabled by providing `--operator-address=
`, i.e. `--operator-address=127.0.0.1:50051` CLI argument. + +### Example usage +#### Querying post service status +```sh +❯ grpcurl -plaintext localhost:50051 spacemesh.v1.PostServiceOperator/Status +{ + "status": "IDLE" +} +``` \ No newline at end of file diff --git a/service/api b/service/api index cf5fa12b..f2516cf2 160000 --- a/service/api +++ b/service/api @@ -1 +1 @@ -Subproject commit cf5fa12bf3dcae33f4472b120a7391e42dd68ad0 +Subproject commit f2516cf212831b08fc1e9b84df6679d0eccf5b05 diff --git a/service/build.rs b/service/build.rs index b4eb8a0a..e66f81ca 100644 --- a/service/build.rs +++ b/service/build.rs @@ -1,4 +1,10 @@ +use std::{env, path::PathBuf}; + fn main() -> Result<(), Box> { - tonic_build::configure().compile(&["api/spacemesh/v1/post.proto"], &["api"])?; + let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap()); + tonic_build::configure() + .file_descriptor_set_path(out_dir.join("post_descriptor.bin")) + .compile(&["api/spacemesh/v1/post.proto"], &["api"])?; + Ok(()) } diff --git a/service/src/lib.rs b/service/src/lib.rs index 431311a8..382b7173 100644 --- a/service/src/lib.rs +++ b/service/src/lib.rs @@ -1,2 +1,3 @@ pub mod client; +pub mod operator; pub mod service; diff --git a/service/src/main.rs b/service/src/main.rs index 75cf63ac..42607995 100644 --- a/service/src/main.rs +++ b/service/src/main.rs @@ -1,11 +1,12 @@ -use std::{fs::read_to_string, path::PathBuf, time::Duration}; +use std::{fs::read_to_string, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration}; use clap::{Args, Parser, ValueEnum}; use eyre::Context; +use tokio::net::TcpListener; use tonic::transport::{Certificate, Identity}; use post::pow::randomx::RandomXFlag; -use post_service::client; +use post_service::{client, operator}; /// Post Service #[derive(Parser, Debug)] @@ -29,6 +30,11 @@ struct Cli { #[command(flatten, next_help_heading = "TLS configuration")] tls: Option, + + /// address to listen on for operator service + /// the operator service is disabled if not specified + #[arg(long)] + operator_address: Option, } #[derive(Args, Debug)] @@ -55,6 +61,22 @@ struct PostConfig { scrypt: ScryptParams, } +impl From for post::config::Config { + fn from(val: PostConfig) -> Self { + post::config::Config { + k1: val.k1, + k2: val.k2, + k3: val.k3, + pow_difficulty: val.pow_difficulty, + scrypt: post::ScryptParams::new( + val.scrypt.n.ilog2() as u8 - 1, + val.scrypt.r.ilog2() as u8, + val.scrypt.p.ilog2() as u8, + ), + } + } +} + /// Scrypt parameters for initialization #[derive(Args, Debug)] struct ScryptParams { @@ -161,17 +183,7 @@ async fn main() -> eyre::Result<()> { let service = post_service::service::PostService::new( args.dir, - post::config::Config { - k1: args.post_config.k1, - k2: args.post_config.k2, - k3: args.post_config.k3, - pow_difficulty: args.post_config.pow_difficulty, - scrypt: post::ScryptParams::new( - args.post_config.scrypt.n.ilog2() as u8 - 1, - args.post_config.scrypt.r.ilog2() as u8, - args.post_config.scrypt.p.ilog2() as u8, - ), - }, + args.post_config.into(), args.post_settings.nonces, args.post_settings.threads, args.post_settings.randomx_mode.into(), @@ -199,7 +211,13 @@ async fn main() -> eyre::Result<()> { None }; - let client = client::ServiceClient::new(args.address, args.reconnect_interval_s, tls, service)?; + let service = Arc::new(service); + + if let Some(address) = args.operator_address { + let listener = TcpListener::bind(address).await?; + tokio::spawn(operator::OperatorServer::run(listener, service.clone())); + } + let client = client::ServiceClient::new(args.address, args.reconnect_interval_s, tls, service)?; client.run().await } diff --git a/service/src/operator.rs b/service/src/operator.rs new file mode 100644 index 00000000..e27a8226 --- /dev/null +++ b/service/src/operator.rs @@ -0,0 +1,119 @@ +//! Operator service for controlling the post service. +//! +//! It exposes a GRPC API defined in `spacemesh.v1.post.proto`. +//! Allows to query the status of the post service. + +use std::sync::Arc; + +use tokio::net::TcpListener; +use tokio_stream::wrappers::TcpListenerStream; +use tonic::{transport::Server, Request, Response, Status}; + +use spacemesh_v1::post_service_operator_server::{PostServiceOperator, PostServiceOperatorServer}; +use spacemesh_v1::{PostServiceStatusRequest, PostServiceStatusResponse}; + +pub mod spacemesh_v1 { + tonic::include_proto!("spacemesh.v1"); + pub(crate) const FILE_DESCRIPTOR_SET: &[u8] = + tonic::include_file_descriptor_set!("post_descriptor"); +} + +pub enum ServiceState { + Idle, + Proving, +} + +#[mockall::automock] +/// The Service trait provides funcionality required by the OperatorService. +pub trait Service { + /// Returns the current state of the service. + fn status(&self) -> ServiceState; +} + +#[derive(Debug, Default)] +pub struct OperatorService { + operator: Arc, +} + +#[tonic::async_trait] +impl PostServiceOperator for OperatorService { + async fn status( + &self, + request: Request, + ) -> Result, Status> { + log::debug!("got a request from {:?}", request.remote_addr()); + + let status = match self.operator.status() { + ServiceState::Idle => spacemesh_v1::post_service_status_response::Status::Idle, + ServiceState::Proving => spacemesh_v1::post_service_status_response::Status::Proving, + }; + + Ok(Response::new(PostServiceStatusResponse { + status: status as _, + })) + } +} + +#[derive(Debug, Default)] +pub struct OperatorServer {} + +impl OperatorServer { + pub async fn run(listener: TcpListener, operator: Arc) -> eyre::Result<()> + where + O: Service + Sync + Send + 'static, + { + log::info!("running operator service on {}", listener.local_addr()?); + + let reflection_service = tonic_reflection::server::Builder::configure() + .register_encoded_file_descriptor_set(spacemesh_v1::FILE_DESCRIPTOR_SET) + .build()?; + + let operator_service = PostServiceOperatorServer::new(OperatorService { operator }); + + Server::builder() + .add_service(reflection_service) + .add_service(operator_service) + .serve_with_incoming(TcpListenerStream::new(listener)) + .await + .map_err(|e| eyre::eyre!("failed to serve: {e}")) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use tokio::net::TcpListener; + + use super::spacemesh_v1::post_service_operator_client::PostServiceOperatorClient; + use super::spacemesh_v1::post_service_status_response::Status; + use super::spacemesh_v1::PostServiceStatusRequest; + + #[tokio::test] + async fn test_status() { + let mut svc_operator = super::MockService::new(); + svc_operator + .expect_status() + .once() + .returning(|| super::ServiceState::Idle); + svc_operator + .expect_status() + .once() + .returning(|| super::ServiceState::Proving); + + let listener = TcpListener::bind("localhost:0").await.unwrap(); + let addr: std::net::SocketAddr = listener.local_addr().unwrap(); + + tokio::spawn(super::OperatorServer::run(listener, Arc::new(svc_operator))); + + let mut client = PostServiceOperatorClient::connect(format!("http://{addr}")) + .await + .unwrap(); + + let response = client.status(PostServiceStatusRequest {}).await.unwrap(); + assert_eq!(response.into_inner().status(), Status::Idle); + + let response = client.status(PostServiceStatusRequest {}).await.unwrap(); + assert_eq!(response.into_inner().status(), Status::Proving); + } +} diff --git a/service/src/service.rs b/service/src/service.rs index f8062795..8aa6933d 100644 --- a/service/src/service.rs +++ b/service/src/service.rs @@ -122,6 +122,17 @@ impl crate::client::PostService for PostService { } } +impl crate::operator::Service for PostService { + fn status(&self) -> crate::operator::ServiceState { + let proof_gen = self.proof_generation.lock().unwrap(); + if proof_gen.as_ref().is_some() { + crate::operator::ServiceState::Proving + } else { + crate::operator::ServiceState::Idle + } + } +} + impl Drop for PostService { fn drop(&mut self) { log::info!("shutting down post service"); diff --git a/service/tests/test_operator.rs b/service/tests/test_operator.rs new file mode 100644 index 00000000..e957f300 --- /dev/null +++ b/service/tests/test_operator.rs @@ -0,0 +1,94 @@ +use core::panic; +use std::{sync::Arc, time::Duration}; + +use tokio::{net::TcpListener, time::sleep}; + +use post::{ + initialize::{CpuInitializer, Initialize}, + pow::randomx::RandomXFlag, + ScryptParams, +}; +use post_service::{ + client::spacemesh_v1::{service_response, GenProofStatus}, + operator::{ + spacemesh_v1::{ + post_service_operator_client::PostServiceOperatorClient, + post_service_status_response::Status, PostServiceStatusRequest, + }, + OperatorServer, + }, +}; + +#[allow(dead_code)] +mod server; +use server::TestServer; + +#[tokio::test] +async fn test_gen_proof_in_progress() { + // Initialize some data + let datadir = tempfile::tempdir().unwrap(); + + let cfg = post::config::Config { + k1: 8, + k2: 4, + k3: 4, + pow_difficulty: [0xFF; 32], + scrypt: ScryptParams::new(0, 0, 0), + }; + + CpuInitializer::new(cfg.scrypt) + .initialize(datadir.path(), &[0xBE; 32], &[0xCE; 32], 256, 4, 256, None) + .unwrap(); + + let pow_flags = RandomXFlag::get_recommended_flags(); + + let service = Arc::new( + post_service::service::PostService::new(datadir.into_path(), cfg, 16, 1, pow_flags) + .unwrap(), + ); + + let mut test_server = TestServer::new().await; + let client = test_server.create_client(service.clone()); + tokio::spawn(client.run()); + + // Create operator server and client + let listener = TcpListener::bind("localhost:0").await.unwrap(); + let operator_addr = format!("http://{}", listener.local_addr().unwrap()); + tokio::spawn(OperatorServer::run(listener, service)); + let mut client = PostServiceOperatorClient::connect(operator_addr) + .await + .unwrap(); + + // It starts in idle state + let response = client.status(PostServiceStatusRequest {}).await.unwrap(); + assert_eq!(response.into_inner().status(), Status::Idle); + + // It transforms to Proving when a proof generation starts + let connected = test_server.connected.recv().await.unwrap(); + TestServer::generate_proof(&connected, vec![0xCA; 32]).await; + let response = client.status(PostServiceStatusRequest {}).await.unwrap(); + assert_eq!(response.into_inner().status(), Status::Proving); + + loop { + let response = TestServer::generate_proof(&connected, vec![0xCA; 32]).await; + if let Some(service_response::Kind::GenProof(resp)) = response.kind { + match resp.status() { + GenProofStatus::Ok => { + if resp.proof.is_some() { + break; + } + } + _ => { + panic!("Got error response from GenProof"); + } + } + } else { + unreachable!(); + } + sleep(Duration::from_millis(10)).await; + } + + // It transforms back to Idle when the proof generation finishes + let response = client.status(PostServiceStatusRequest {}).await.unwrap(); + assert_eq!(response.into_inner().status(), Status::Idle); +}