diff --git a/Cargo.lock b/Cargo.lock index 22a63dd..a01cadf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1833,6 +1833,19 @@ dependencies = [ "tracing", ] +[[package]] +name = "miden-private-transport-node-load-test" +version = "0.1.0" +dependencies = [ + "anyhow", + "chrono", + "clap", + "miden-objects", + "miden-private-transport-client", + "rand 0.9.2", + "tokio", +] + [[package]] name = "miden-private-transport-proto" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index a9976d3..ef2e591 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,13 @@ [workspace] -members = ["bin/cli", "bin/node", "crates/node", "crates/proto", "crates/rust-client", "crates/web-client"] +members = [ + "bin/cli", + "bin/load-test", + "bin/node", + "crates/node", + "crates/proto", + "crates/rust-client", + "crates/web-client", +] resolver = "2" [workspace.package] diff --git a/Makefile b/Makefile index 08918ff..04f39b3 100644 --- a/Makefile +++ b/Makefile @@ -83,7 +83,7 @@ check: ## Check all targets and features for errors without code generation .PHONY: build build: ## Builds all crates and re-builds protobuf bindings for proto crates - cargo build --locked --workspace + BUILD_PROTO=1 cargo build --locked --workspace --all-targets --exclude miden-private-transport-client-web # --- node-docker --------------------------------------------------------------------------------- diff --git a/bin/cli/src/main.rs b/bin/cli/src/main.rs index 04a3114..651157f 100644 --- a/bin/cli/src/main.rs +++ b/bin/cli/src/main.rs @@ -95,6 +95,9 @@ enum Commands { /// Recipient account ID #[arg(long)] recipient: String, + /// Split header / details + #[arg(long)] + split: bool, }, /// Random address (bech32) for testing purposes @@ -146,8 +149,8 @@ async fn main() -> Result<()> { client.register_tag(tag.into())?; println!("✅ Tag {tag} registered successfully"); }, - Commands::TestNote { recipient } => { - mock_note(&recipient)?; + Commands::TestNote { recipient, split } => { + mock_note(&recipient, split)?; }, Commands::TestAddress => { test_address(); @@ -261,13 +264,21 @@ async fn cleanup_old_data(client: &TransportLayerClient, days: u32) -> Result<() Ok(()) } -fn mock_note(recipient_address_bech32: &str) -> Result<()> { +fn mock_note(recipient_address_bech32: &str, split: bool) -> Result<()> { use miden_objects::utils::Serializable; let (_, address) = Address::from_bech32(recipient_address_bech32) .map_err(|e| anyhow!("Invalid recipient address {recipient_address_bech32}: {e}"))?; let note = mock_note_p2id_with_addresses(&mock_address(), &address); - let hex_note = hex::encode(note.to_bytes()); - info!("Test note: {}", hex_note); + if split { + let hex_note_header = hex::encode(note.header().to_bytes()); + let hex_note_details = hex::encode(NoteDetails::from(note).to_bytes()); + info!("Test note header: {}", hex_note_header); + info!("Test note details: {}", hex_note_details); + } else { + let hex_note = hex::encode(note.to_bytes()); + info!("Test note: {}", hex_note); + } + Ok(()) } diff --git a/bin/load-test/Cargo.toml b/bin/load-test/Cargo.toml new file mode 100644 index 0000000..f945d6e --- /dev/null +++ b/bin/load-test/Cargo.toml @@ -0,0 +1,29 @@ +[package] +authors.workspace = true +description = "Miden Private Transport Node Load-Testing Tool" +edition.workspace = true +homepage.workspace = true +keywords = ["messaging", "miden"] +license.workspace = true +name = "miden-private-transport-node-load-test" +readme.workspace = true +repository.workspace = true +rust-version.workspace = true +version.workspace = true + +[lints] +workspace = true + +[dependencies] +# Workspace/local +miden-private-transport-client = { features = ["sqlite", "tonic"], workspace = true } + +# Miden +miden-objects = { workspace = true } + +# External +anyhow = { workspace = true } +chrono = { workspace = true } +clap = { workspace = true } +rand = { workspace = true } +tokio = { workspace = true } diff --git a/bin/load-test/README.md b/bin/load-test/README.md new file mode 100644 index 0000000..ab26914 --- /dev/null +++ b/bin/load-test/README.md @@ -0,0 +1,11 @@ +# Miden Private Transport Node Load-Test Tool + +Tests the node implementation by flooding it with different requests. +Success rate, latency, and throughput are measured for each testing scenario. + +## Scenarios + +- `send-note`: Issue "SendNote" requests (one note) to the server; +- `fetch-notes`: Issue "FetchNotes" requests to the server (responses will have `n`-configured notes); +- `mixed`: Issue "SendNote" + "FetchNotes" requests in random order. "FetchNotes" may yield some response notes; +- `req-rep`: Issue one "SendNote" to one "FetchNotes" requests. "FetchNotes" response will yield one note. diff --git a/bin/load-test/src/grpc.rs b/bin/load-test/src/grpc.rs new file mode 100644 index 0000000..2a479c2 --- /dev/null +++ b/bin/load-test/src/grpc.rs @@ -0,0 +1,287 @@ +#![allow(clippy::cast_precision_loss)] + +use std::{ + string::ToString, + time::{Duration, Instant}, +}; + +use anyhow::Result; +use chrono::Utc; +use miden_objects::utils::Serializable; +use miden_private_transport_client::GrpcClient; +use tokio::{sync::mpsc, time::sleep}; + +use super::utils::{TagGeneration, generate_dummy_notes}; +use crate::{RequestResult, StressMetrics}; + +#[derive(Clone)] +pub struct GrpcStress { + endpoint: String, + workers: usize, + requests: usize, + rate: Option, +} + +impl GrpcStress { + pub fn new(endpoint: String, workers: usize, requests: usize, rate: Option) -> Self { + Self { endpoint, workers, requests, rate } + } + + /// Each worker will run the provided fn `req` + async fn work(&self, req: F) -> Result + where + F: Fn(Self, mpsc::UnboundedSender) -> Fut + Send + Sync + Clone + 'static, + Fut: std::future::Future + Send + 'static, + { + let (tx, mut rx) = mpsc::unbounded_channel(); + let mut handles = vec![]; + + let start_time = Instant::now(); + + // Spawn workers + for _ in 0..self.workers { + let cfg = self.clone(); + let req = req.clone(); + let tx = tx.clone(); + + let handle = tokio::spawn(async move { req(cfg, tx).await }); + + handles.push(handle); + } + + // Collect results + let mut total_requests = 0; + let mut successful_requests = 0; + let mut failed_requests = 0; + let mut min_latency = Duration::MAX; + let mut max_latency = Duration::ZERO; + let mut total_latency = Duration::ZERO; + let mut total_size = 0; + + while let Some(result) = rx.recv().await { + total_requests += 1; + + if result.success { + successful_requests += 1; + } else { + failed_requests += 1; + println!("Request failed: {:?}", result.error); + } + + min_latency = min_latency.min(result.latency); + max_latency = max_latency.max(result.latency); + total_latency += result.latency; + total_size += result.size; + + if total_requests >= self.requests { + break; + } + } + + // Wait for all workers to complete + for handle in handles { + let _ = handle.await; + } + + let total_duration = start_time.elapsed(); + let avg_latency = if total_requests > 0 { + Duration::from_nanos(total_latency.as_nanos() as u64 / total_requests as u64) + } else { + Duration::ZERO + }; + + let requests_per_second = if total_duration.as_secs_f64() > 0.0 { + total_requests as f64 / total_duration.as_secs_f64() + } else { + 0.0 + }; + + let throughput_mbs = if total_size > 0 { + (total_size as f64 / f64::from(1024 * 1024)) / total_duration.as_secs_f64() + } else { + 0.0 + }; + + Ok(StressMetrics { + total_requests, + successful_requests, + failed_requests, + total_duration, + min_latency, + max_latency, + avg_latency, + requests_per_second, + throughput_mbs, + }) + } + + pub async fn send_note(&self) -> Result { + println!("Running send-note load test"); + + self.work(|cfg, tx| async move { + let mut client = GrpcClient::connect(cfg.endpoint, 1000).await.unwrap(); + let n_requests = cfg.requests / cfg.workers; + let notes = generate_dummy_notes(n_requests, &TagGeneration::Sequential(0)); + + for (note_header, note_details) in notes { + let size = note_header.get_size_hint() + note_details.len(); + + let start = Instant::now(); + let result = client.send_note(note_header, note_details).await; + let latency = start.elapsed(); + + let success = result.is_ok(); + let error = result.err().map(|e| e.to_string()); + + let _ = tx.send(RequestResult { success, latency, error, size }); + + // Rate limiting + if let Some(rate) = cfg.rate { + let delay = Duration::from_secs_f64(1.0 / rate); + sleep(delay).await; + } + } + }) + .await + } + + /// `fetch-notes` stress test + /// + /// Also populates the server with `n` notes for each tag before fetching them. + pub async fn fetch_notes(&self, n: usize) -> Result { + println!("Running fetch-notes {n} load test"); + + let timestamp = Utc::now(); + + println!("Populating..."); + let mut handles = vec![]; + for _ in 0..n { + let cfg = self.clone(); + let handle = tokio::spawn(async move { + let mut client = GrpcClient::connect(cfg.endpoint.clone(), 1000).await.unwrap(); + let notes = generate_dummy_notes(cfg.requests, &TagGeneration::Sequential(0)); + for (note_header, note_details) in notes { + client.send_note(note_header, note_details).await.unwrap(); + } + }); + + handles.push(handle); + } + + for handle in handles { + let _ = handle.await; + } + println!("Fetching..."); + + self.work(move |cfg, tx| async move { + let mut client = GrpcClient::connect(cfg.endpoint, 1000).await.unwrap(); + let n_requests = cfg.requests / cfg.workers; + + let mut tag = super::utils::TAG_LOCAL_ANY; + for _ in 0..n_requests { + tag += 1; + + let start = Instant::now(); + let result = client.fetch_notes(tag.into(), timestamp).await; + let latency = start.elapsed(); + + let success = result.is_ok(); + let error = result.as_ref().err().map(ToString::to_string); + let size: usize = result + .map(|notes| { + notes + .iter() + .map(|note| note.header.get_size_hint() + note.details.len()) + .sum() + }) + .unwrap_or(0); + + let _ = tx.send(RequestResult { success, latency, error, size }); + + // Rate limiting + if let Some(rate) = cfg.rate { + let delay = Duration::from_secs_f64(1.0 / rate); + sleep(delay).await; + } + } + }) + .await + } + + pub async fn mixed(&self) -> Result { + println!("Running mixed load test (send-note + fetch-notes)"); + + let cfg = Self::new(self.endpoint.clone(), self.workers / 2, self.requests / 2, self.rate); + + // Run both tests and combine metrics + let (send_note_res, fetch_notes_res) = tokio::join!(cfg.send_note(), cfg.fetch_notes(0)); + let (send_note_metrics, fetch_notes_metrics) = + (send_note_res.unwrap(), fetch_notes_res.unwrap()); + + // Combine metrics + Ok(StressMetrics { + total_requests: send_note_metrics.total_requests + fetch_notes_metrics.total_requests, + successful_requests: send_note_metrics.successful_requests + + fetch_notes_metrics.successful_requests, + failed_requests: send_note_metrics.failed_requests + + fetch_notes_metrics.failed_requests, + total_duration: send_note_metrics + .total_duration + .max(fetch_notes_metrics.total_duration), + min_latency: send_note_metrics.min_latency.min(fetch_notes_metrics.min_latency), + max_latency: send_note_metrics.max_latency.max(fetch_notes_metrics.max_latency), + avg_latency: Duration::from_nanos(u128::midpoint( + send_note_metrics.avg_latency.as_nanos(), + fetch_notes_metrics.avg_latency.as_nanos(), + ) as u64), + requests_per_second: send_note_metrics.requests_per_second + + fetch_notes_metrics.requests_per_second, + throughput_mbs: send_note_metrics.throughput_mbs + + fetch_notes_metrics.requests_per_second, + }) + } + + pub async fn req_rep(&self) -> Result { + println!("Running req-rep (1-note, send-note -> fetch_notes)"); + + self.work(|cfg, tx| async move { + let mut client = GrpcClient::connect(cfg.endpoint, 1000).await.unwrap(); + let timestamp = Utc::now(); + let n_requests = cfg.requests / cfg.workers; + + let notes = generate_dummy_notes(n_requests, &TagGeneration::Random); + + for (note_header, note_details) in notes { + let tag = note_header.metadata().tag(); + let start = Instant::now(); + let mut size = note_header.get_size_hint() + note_details.len(); + + let mut result = client.send_note(note_header, note_details).await.map(|_| vec![]); + if result.is_ok() { + result = client.fetch_notes(tag, timestamp).await; + } + let latency = start.elapsed(); + + let success = result.is_ok(); + let error = result.as_ref().err().map(ToString::to_string); + size += result + .map(|notes| { + notes + .iter() + .map(|note| note.header.get_size_hint() + note.details.len()) + .sum() + }) + .unwrap_or(0); + + let _ = tx.send(RequestResult { success, latency, error, size }); + + // Rate limiting + if let Some(rate) = cfg.rate { + let delay = Duration::from_secs_f64(1.0 / rate); + sleep(delay).await; + } + } + }) + .await + } +} diff --git a/bin/load-test/src/main.rs b/bin/load-test/src/main.rs new file mode 100644 index 0000000..2dba458 --- /dev/null +++ b/bin/load-test/src/main.rs @@ -0,0 +1,138 @@ +//! Load Testing Tool for Miden Private Transport + +#![allow(clippy::cast_precision_loss)] + +use std::time::Duration; + +use anyhow::Result; +use clap::{Parser, Subcommand}; + +pub mod grpc; +pub mod utils; + +use grpc::GrpcStress; + +#[derive(Parser)] +#[command(name = "miden-private-transport-node-load-test")] +#[command(about = "Load testing tool for Miden Private Transport Node")] +struct Args { + /// Server host + #[arg(long, default_value = "127.0.0.1")] + host: String, + + /// Server port + #[arg(long, default_value = "8080")] + port: u16, + + /// Number of concurrent workers + #[arg(long, default_value = "10")] + workers: usize, + + /// Total number of requests to send + #[arg(long, default_value = "100000")] + requests: usize, + + /// Test scenario to run + #[command(subcommand)] + scenario: Scenario, + + /// Request rate (requests per second per worker) + #[arg(long)] + rate: Option, + + /// Enable verbose logging + #[arg(short, long)] + verbose: bool, +} + +#[derive(Copy, Clone, Debug, Subcommand)] +enum Scenario { + SendNote, + FetchNotes { + /// Fetch `n` notes per request + n: usize, + }, + Mixed, + ReqRep, +} + +#[derive(Debug, Clone)] +pub struct StressMetrics { + total_requests: usize, + successful_requests: usize, + failed_requests: usize, + total_duration: Duration, + min_latency: Duration, + max_latency: Duration, + avg_latency: Duration, + requests_per_second: f64, + throughput_mbs: f64, +} + +#[derive(Debug)] +struct RequestResult { + success: bool, + latency: Duration, + error: Option, + size: usize, +} + +#[tokio::main] +async fn main() -> Result<()> { + let args = Args::parse(); + + let endpoint = format!("http://{}:{}", args.host, args.port); + println!("Starting load test against: {endpoint}"); + + // Run the load test + let metrics = match args.scenario { + Scenario::SendNote => { + GrpcStress::new(endpoint, args.workers, args.requests, args.rate) + .send_note() + .await? + }, + Scenario::FetchNotes { n } => { + GrpcStress::new(endpoint, args.workers, args.requests, args.rate) + .fetch_notes(n) + .await? + }, + Scenario::Mixed => { + GrpcStress::new(endpoint, args.workers, args.requests, args.rate) + .mixed() + .await? + }, + Scenario::ReqRep => { + GrpcStress::new(endpoint, args.workers, args.requests, args.rate) + .req_rep() + .await? + }, + }; + + metrics.print(args.scenario); + + Ok(()) +} + +impl StressMetrics { + fn print(&self, scenario: Scenario) { + println!("\n=== {scenario:?} LOAD TEST RESULTS ==="); + println!("Total Requests: {}", self.total_requests); + println!( + "Successful: {} ({:.1}%)", + self.successful_requests, + (self.successful_requests as f64 / self.total_requests as f64) * 100.0 + ); + println!( + "Failed: {} ({:.1}%)", + self.failed_requests, + (self.failed_requests as f64 / self.total_requests as f64) * 100.0 + ); + println!("Total Duration: {:.2}s", self.total_duration.as_secs_f64()); + println!("Requests/sec: {:.2}", self.requests_per_second); + println!("Min Latency: {:.2}ms", self.min_latency.as_secs_f64() * 1000.0); + println!("Max Latency: {:.2}ms", self.max_latency.as_secs_f64() * 1000.0); + println!("Avg Latency: {:.2}ms", self.avg_latency.as_secs_f64() * 1000.0); + println!("Throughput (MB/sec): {:.2}", self.throughput_mbs); + println!("========================"); + } +} diff --git a/bin/load-test/src/utils.rs b/bin/load-test/src/utils.rs new file mode 100644 index 0000000..50f5f4b --- /dev/null +++ b/bin/load-test/src/utils.rs @@ -0,0 +1,33 @@ +use miden_objects::note::NoteHeader; +use miden_private_transport_client::types::test_note_header; +use rand::Rng; + +const DETAILS_LEN_AVG: usize = 1500; +const DETAILS_LEN_DEV: usize = 100; +pub const TAG_LOCAL_ANY: u32 = 0xc000_0000; + +pub enum TagGeneration { + Sequential(u32), + Random, +} + +pub fn generate_dummy_notes(n: usize, tag_gen: &TagGeneration) -> Vec<(NoteHeader, Vec)> { + let mut rng = rand::rng(); + + let mut tag = TAG_LOCAL_ANY; + (0..n) + .map(|_| { + tag = match tag_gen { + TagGeneration::Sequential(offset) => tag + 1 + offset, + TagGeneration::Random => TAG_LOCAL_ANY + rng.random_range(0..(1 << 29)), + }; + let header = test_note_header(tag.into()); + let details = vec![ + 0u8; + DETAILS_LEN_AVG + + rng.random_range(0..(DETAILS_LEN_DEV * 2 - DETAILS_LEN_DEV)) + ]; + (header, details) + }) + .collect() +} diff --git a/bin/node/docker/Dockerfile b/bin/node/docker/Dockerfile index 5a7f7bf..c23b073 100644 --- a/bin/node/docker/Dockerfile +++ b/bin/node/docker/Dockerfile @@ -1,5 +1,5 @@ # Miden Private Transport Node -FROM rust:1.87-slim-bullseye AS builder +FROM rust:1.89-slim-bullseye AS builder RUN apt-get update && \ apt-get -y upgrade && \