Skip to content

Commit

Permalink
Basic operator API
Browse files Browse the repository at this point in the history
  • Loading branch information
poszu committed Oct 25, 2023
1 parent 3a61505 commit a92ecfd
Show file tree
Hide file tree
Showing 11 changed files with 321 additions and 18 deletions.
18 changes: 16 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ Includes:
### Randomx-rs
[RandomX](https://github.com/tevador/randomx), that [randomx-rs](https://github.com/spacemeshos/randomx-rs) depends on, requires **cmake**. Follow [these instructions](https://github.com/spacemeshos/randomx-rs#build-dependencies) to install it.

## Post Service
Please refer to [service README](service/README.md) for instructions.

## Troubleshooting
### Crash on Mac arm64
RandomX is known to misbehave, or even crash on arm64 Macs when using JIT. See this issue for details: https://github.com/tevador/RandomX/issues/262.
Expand Down
1 change: 1 addition & 0 deletions service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ path = "src/lib.rs"
post-rs = { path = "../" }
prost = "0.12.1"
tonic = { version = "0.10.0", features = ["tls"] }
tonic-reflection = "0.10.2"
tokio = { version = "1.0", features = [
"rt-multi-thread",
"macros",
Expand Down
36 changes: 36 additions & 0 deletions service/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Post Service
Post service allows to separate expensive PoST proving from a node by allowing to generate a proof on a different machine. It connects to the node via GRPC (on an address pointed by `--address`) and awaits commands from the node.

## How to run
First of all, the service currently doesn't support initializing PoST data. The data must be initialized separately (presumably using [postcli](https://github.com/spacemeshos/post/tree/develop/cmd/postcli) and placed in a directory pointed to by `--dir`).

#### Example running on an un-encrypted channel, with the default configuration of _threads_ and _nonces_
```sh
service --address=http://my-node-address.org --dir=./post-data
```

#### Example running on an encrypted (TLS) channel, with the custom _threads_ and _nonces_
```sh
service --address=https://my-node-address.org --dir=./post-data --threads=8 --nonces=288
```

A full usage/help can be viewed with
```sh
service --help
```

## Operator API
The operator API is a set of GRPC endpoints allowing control of the post service.

The GRPC API supports reflection for easy use with tools like [grpcurl](https://github.com/fullstorydev/grpcurl).

It is enabled by providing `--operator-address=<address>`, i.e. `--operator-address=127.0.0.1:50051` CLI argument.

### Example usage
#### Querying post service status
```sh
❯ grpcurl -plaintext localhost:50051 spacemesh.v1.PostServiceOperator/Status
{
"status": "IDLE"
}
```
2 changes: 1 addition & 1 deletion service/api
8 changes: 7 additions & 1 deletion service/build.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
use std::{env, path::PathBuf};

fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::configure().compile(&["api/spacemesh/v1/post.proto"], &["api"])?;
let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap());
tonic_build::configure()
.file_descriptor_set_path(out_dir.join("post_descriptor.bin"))
.compile(&["api/spacemesh/v1/post.proto"], &["api"])?;

Ok(())
}
1 change: 1 addition & 0 deletions service/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod client;
pub mod operator;
pub mod service;
46 changes: 32 additions & 14 deletions service/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use std::{fs::read_to_string, path::PathBuf, time::Duration};
use std::{fs::read_to_string, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};

use clap::{Args, Parser, ValueEnum};
use eyre::Context;
use tokio::net::TcpListener;
use tonic::transport::{Certificate, Identity};

use post::pow::randomx::RandomXFlag;
use post_service::client;
use post_service::{client, operator};

/// Post Service
#[derive(Parser, Debug)]
Expand All @@ -29,6 +30,11 @@ struct Cli {

#[command(flatten, next_help_heading = "TLS configuration")]
tls: Option<Tls>,

/// address to listen on for operator service
/// the operator service is disabled if not specified
#[arg(long)]
operator_address: Option<SocketAddr>,
}

#[derive(Args, Debug)]
Expand All @@ -55,6 +61,22 @@ struct PostConfig {
scrypt: ScryptParams,
}

impl From<PostConfig> for post::config::Config {
fn from(val: PostConfig) -> Self {
post::config::Config {
k1: val.k1,
k2: val.k2,
k3: val.k3,
pow_difficulty: val.pow_difficulty,
scrypt: post::ScryptParams::new(
val.scrypt.n.ilog2() as u8 - 1,
val.scrypt.r.ilog2() as u8,
val.scrypt.p.ilog2() as u8,
),
}
}
}

/// Scrypt parameters for initialization
#[derive(Args, Debug)]
struct ScryptParams {
Expand Down Expand Up @@ -161,17 +183,7 @@ async fn main() -> eyre::Result<()> {

let service = post_service::service::PostService::new(
args.dir,
post::config::Config {
k1: args.post_config.k1,
k2: args.post_config.k2,
k3: args.post_config.k3,
pow_difficulty: args.post_config.pow_difficulty,
scrypt: post::ScryptParams::new(
args.post_config.scrypt.n.ilog2() as u8 - 1,
args.post_config.scrypt.r.ilog2() as u8,
args.post_config.scrypt.p.ilog2() as u8,
),
},
args.post_config.into(),
args.post_settings.nonces,
args.post_settings.threads,
args.post_settings.randomx_mode.into(),
Expand Down Expand Up @@ -199,7 +211,13 @@ async fn main() -> eyre::Result<()> {
None
};

let client = client::ServiceClient::new(args.address, args.reconnect_interval_s, tls, service)?;
let service = Arc::new(service);

if let Some(address) = args.operator_address {
let listener = TcpListener::bind(address).await?;
tokio::spawn(operator::OperatorServer::run(listener, service.clone()));
}

let client = client::ServiceClient::new(args.address, args.reconnect_interval_s, tls, service)?;
client.run().await
}
119 changes: 119 additions & 0 deletions service/src/operator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
//! Operator service for controlling the post service.
//!
//! It exposes a GRPC API defined in `spacemesh.v1.post.proto`.
//! Allows to query the status of the post service.
use std::sync::Arc;

use tokio::net::TcpListener;
use tokio_stream::wrappers::TcpListenerStream;
use tonic::{transport::Server, Request, Response, Status};

use spacemesh_v1::post_service_operator_server::{PostServiceOperator, PostServiceOperatorServer};
use spacemesh_v1::{PostServiceStatusRequest, PostServiceStatusResponse};

pub mod spacemesh_v1 {
tonic::include_proto!("spacemesh.v1");
pub(crate) const FILE_DESCRIPTOR_SET: &[u8] =
tonic::include_file_descriptor_set!("post_descriptor");
}

pub enum ServiceState {
Idle,
Proving,
}

#[mockall::automock]
/// The Service trait provides funcionality required by the OperatorService.
pub trait Service {
/// Returns the current state of the service.
fn status(&self) -> ServiceState;
}

#[derive(Debug, Default)]

Check warning on line 33 in service/src/operator.rs

View check run for this annotation

Codecov / codecov/patch

service/src/operator.rs#L33

Added line #L33 was not covered by tests
pub struct OperatorService<O: Service> {
operator: Arc<O>,
}

#[tonic::async_trait]
impl<O: Service + Sync + Send + 'static> PostServiceOperator for OperatorService<O> {
async fn status(
&self,
request: Request<PostServiceStatusRequest>,
) -> Result<Response<PostServiceStatusResponse>, Status> {
log::debug!("got a request from {:?}", request.remote_addr());

let status = match self.operator.status() {
ServiceState::Idle => spacemesh_v1::post_service_status_response::Status::Idle,
ServiceState::Proving => spacemesh_v1::post_service_status_response::Status::Proving,
};

Ok(Response::new(PostServiceStatusResponse {
status: status as _,
}))
}
}

#[derive(Debug, Default)]

Check warning on line 57 in service/src/operator.rs

View check run for this annotation

Codecov / codecov/patch

service/src/operator.rs#L57

Added line #L57 was not covered by tests
pub struct OperatorServer {}

impl OperatorServer {
pub async fn run<O>(listener: TcpListener, operator: Arc<O>) -> eyre::Result<()>
where
O: Service + Sync + Send + 'static,
{
log::info!("running operator service on {}", listener.local_addr()?);

Check warning on line 65 in service/src/operator.rs

View check run for this annotation

Codecov / codecov/patch

service/src/operator.rs#L65

Added line #L65 was not covered by tests

let reflection_service = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(spacemesh_v1::FILE_DESCRIPTOR_SET)
.build()?;

let operator_service = PostServiceOperatorServer::new(OperatorService { operator });

Server::builder()
.add_service(reflection_service)
.add_service(operator_service)
.serve_with_incoming(TcpListenerStream::new(listener))
.await
.map_err(|e| eyre::eyre!("failed to serve: {e}"))
}

Check warning on line 79 in service/src/operator.rs

View check run for this annotation

Codecov / codecov/patch

service/src/operator.rs#L78-L79

Added lines #L78 - L79 were not covered by tests
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use tokio::net::TcpListener;

use super::spacemesh_v1::post_service_operator_client::PostServiceOperatorClient;
use super::spacemesh_v1::post_service_status_response::Status;
use super::spacemesh_v1::PostServiceStatusRequest;

#[tokio::test]
async fn test_status() {
let mut svc_operator = super::MockService::new();
svc_operator
.expect_status()
.once()
.returning(|| super::ServiceState::Idle);
svc_operator
.expect_status()
.once()
.returning(|| super::ServiceState::Proving);

let listener = TcpListener::bind("localhost:0").await.unwrap();
let addr: std::net::SocketAddr = listener.local_addr().unwrap();

tokio::spawn(super::OperatorServer::run(listener, Arc::new(svc_operator)));

let mut client = PostServiceOperatorClient::connect(format!("http://{addr}"))
.await
.unwrap();

let response = client.status(PostServiceStatusRequest {}).await.unwrap();
assert_eq!(response.into_inner().status(), Status::Idle);

let response = client.status(PostServiceStatusRequest {}).await.unwrap();
assert_eq!(response.into_inner().status(), Status::Proving);
}
}
11 changes: 11 additions & 0 deletions service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,17 @@ impl crate::client::PostService for PostService {
}
}

impl crate::operator::Service for PostService {
fn status(&self) -> crate::operator::ServiceState {
let proof_gen = self.proof_generation.lock().unwrap();
if proof_gen.as_ref().is_some() {
crate::operator::ServiceState::Proving
} else {
crate::operator::ServiceState::Idle
}
}
}

impl Drop for PostService {
fn drop(&mut self) {
log::info!("shutting down post service");
Expand Down
Loading

0 comments on commit a92ecfd

Please sign in to comment.