diff --git a/Cargo.lock b/Cargo.lock index f8011668d3..ff1001a166 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7524,6 +7524,12 @@ dependencies = [ "digest 0.10.7", ] +[[package]] +name = "peakmem-alloc" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5eb7428a977a472465aced57d8d2335d6167c0ce9c05c283fd6faed3d8d948f6" + [[package]] name = "pem" version = "3.0.5" @@ -13186,11 +13192,14 @@ dependencies = [ "anyhow", "base64 0.22.1", "bcs", + "clap", "criterion", "enum_dispatch", "fastcrypto 0.1.9 (git+https://github.com/MystenLabs/fastcrypto?rev=4db0e90c732bbf7420ca20de808b698883148d9c)", "hex", + "libc", "p256", + "peakmem-alloc", "rand 0.8.5", "reed-solomon-simd", "serde", diff --git a/crates/walrus-core/Cargo.toml b/crates/walrus-core/Cargo.toml index ca4fc447d1..52429d33e0 100644 --- a/crates/walrus-core/Cargo.toml +++ b/crates/walrus-core/Cargo.toml @@ -29,7 +29,10 @@ walrus-test-utils = { workspace = true, optional = true } [dev-dependencies] anyhow.workspace = true +clap.workspace = true criterion.workspace = true +libc.workspace = true +peakmem-alloc = "0.3" serde_test.workspace = true tracing-subscriber.workspace = true walrus-test-utils.workspace = true @@ -47,3 +50,10 @@ harness = false [[bench]] name = "blob_encoding" harness = false + +[[bench]] +name = "encoding_phases" +harness = false + +[[example]] +name = "profile_encoding" diff --git a/crates/walrus-core/benches/encoding_phases.rs b/crates/walrus-core/benches/encoding_phases.rs new file mode 100644 index 0000000000..5b63d6e23a --- /dev/null +++ b/crates/walrus-core/benches/encoding_phases.rs @@ -0,0 +1,248 @@ +// Copyright (c) Walrus Foundation +// SPDX-License-Identifier: Apache-2.0 + +// Allowing `unwrap`s in benchmarks. +#![allow(clippy::unwrap_used)] + +//! Phase-level benchmarks for the blob encoding pipeline at production parameters (n_shards=1000). +//! +//! Complements `blob_encoding.rs` (end-to-end) and `basic_encoding.rs` (individual RS/merkle ops) +//! by measuring each phase of `encode_with_metadata()` independently. + +use core::{num::NonZeroU16, time::Duration}; + +use criterion::{AxisScale, BenchmarkId, Criterion, PlotConfiguration}; +use fastcrypto::hash::Blake2b256; +use walrus_core::{ + encoding::{EncodingFactory as _, ReedSolomonEncoder, ReedSolomonEncodingConfig}, + merkle::{MerkleTree, Node, leaf_hash}, +}; +use walrus_test_utils::random_data; + +const N_SHARDS: u16 = 1000; + +const BLOB_SIZES: &[(u64, &str)] = &[(1 << 20, "1MiB"), (1 << 25, "32MiB"), (1 << 28, "256MiB")]; + +fn encoding_config() -> ReedSolomonEncodingConfig { + ReedSolomonEncodingConfig::new(NonZeroU16::new(N_SHARDS).unwrap()) +} + +/// Benchmark secondary encoding: RS-encode each row to produce repair secondary slivers. +fn secondary_encoding(c: &mut Criterion) { + let config = encoding_config(); + let mut group = c.benchmark_group("secondary_encoding"); + group.plot_config(PlotConfiguration::default().summary_scale(AxisScale::Logarithmic)); + + for (blob_size, size_str) in BLOB_SIZES { + let blob = random_data((*blob_size).try_into().unwrap()); + group.throughput(criterion::Throughput::Bytes(*blob_size)); + + let encoder = config.get_blob_encoder(&blob).unwrap(); + let symbol_size = encoder.symbol_usize(); + let n_rows: usize = config.n_primary_source_symbols().get().into(); + let n_cols: usize = config.n_secondary_source_symbols().get().into(); + let row_len = n_cols * symbol_size; + + // Build row data (same layout as primary slivers' data). + let mut rows: Vec> = Vec::with_capacity(n_rows); + for r in 0..n_rows { + let start = r * row_len; + let end = (start + row_len).min(blob.len()); + let mut row = vec![0u8; row_len]; + if start < blob.len() { + row[..end - start].copy_from_slice(&blob[start..end]); + } + rows.push(row); + } + + group.bench_with_input( + BenchmarkId::new("encode_rows", size_str), + &rows, + |b, rows| { + b.iter(|| { + let mut enc = ReedSolomonEncoder::new( + NonZeroU16::new(symbol_size.try_into().unwrap()).unwrap(), + config.n_secondary_source_symbols(), + NonZeroU16::new(N_SHARDS).unwrap(), + ) + .unwrap(); + for row in rows { + let _ = enc.encode_all_repair_symbols(row); + } + }); + }, + ); + } + + group.finish(); +} + +/// Benchmark primary encoding: RS-encode each column to produce all primary symbols. +fn primary_encoding(c: &mut Criterion) { + let config = encoding_config(); + let mut group = c.benchmark_group("primary_encoding"); + group.plot_config(PlotConfiguration::default().summary_scale(AxisScale::Logarithmic)); + + for (blob_size, size_str) in BLOB_SIZES { + let blob = random_data((*blob_size).try_into().unwrap()); + group.throughput(criterion::Throughput::Bytes(*blob_size)); + + let encoder = config.get_blob_encoder(&blob).unwrap(); + let symbol_size = encoder.symbol_usize(); + let n_shards: usize = N_SHARDS.into(); + let n_rows: usize = config.n_primary_source_symbols().get().into(); + + // Build column data (each column = one secondary sliver's symbols data). + let col_len = n_rows * symbol_size; + let columns: Vec> = (0..n_shards).map(|_| random_data(col_len)).collect(); + + group.bench_with_input( + BenchmarkId::new("encode_columns", size_str), + &columns, + |b, columns| { + b.iter(|| { + let mut enc = ReedSolomonEncoder::new( + NonZeroU16::new(symbol_size.try_into().unwrap()).unwrap(), + config.n_primary_source_symbols(), + NonZeroU16::new(N_SHARDS).unwrap(), + ) + .unwrap(); + for col in columns { + let _ = enc.encode_all_ref(col); + } + }); + }, + ); + } + + group.finish(); +} + +/// Benchmark primary encoding + leaf hashing of each symbol. +fn primary_encoding_with_hashing(c: &mut Criterion) { + let config = encoding_config(); + let mut group = c.benchmark_group("primary_encoding_with_hashing"); + group.plot_config(PlotConfiguration::default().summary_scale(AxisScale::Logarithmic)); + + for (blob_size, size_str) in BLOB_SIZES { + let blob = random_data((*blob_size).try_into().unwrap()); + group.throughput(criterion::Throughput::Bytes(*blob_size)); + + let encoder = config.get_blob_encoder(&blob).unwrap(); + let symbol_size = encoder.symbol_usize(); + let n_shards: usize = N_SHARDS.into(); + let n_rows: usize = config.n_primary_source_symbols().get().into(); + + let col_len = n_rows * symbol_size; + let columns: Vec> = (0..n_shards).map(|_| random_data(col_len)).collect(); + + group.bench_with_input( + BenchmarkId::new("encode_columns_and_hash", size_str), + &columns, + |b, columns| { + b.iter(|| { + let mut enc = ReedSolomonEncoder::new( + NonZeroU16::new(symbol_size.try_into().unwrap()).unwrap(), + config.n_primary_source_symbols(), + NonZeroU16::new(N_SHARDS).unwrap(), + ) + .unwrap(); + let mut hashes = vec![Node::Empty; n_shards * n_shards]; + for (col_index, col) in columns.iter().enumerate() { + let symbols = enc.encode_all_ref(col).unwrap(); + for (row_index, symbol) in symbols.to_symbols().enumerate() { + hashes[n_shards * row_index + col_index] = + leaf_hash::(symbol); + } + } + }); + }, + ); + } + + group.finish(); +} + +/// Benchmark metadata computation from pre-computed symbol hashes (Merkle tree construction). +fn metadata_from_hashes(c: &mut Criterion) { + let mut group = c.benchmark_group("metadata_from_hashes"); + group.plot_config(PlotConfiguration::default().summary_scale(AxisScale::Logarithmic)); + + for (blob_size, size_str) in BLOB_SIZES { + group.throughput(criterion::Throughput::Bytes(*blob_size)); + + let n_shards: usize = N_SHARDS.into(); + // Pre-compute random hashes to isolate metadata construction time. + let hashes: Vec = (0..n_shards * n_shards) + .map(|i| { + let data = i.to_le_bytes(); + leaf_hash::(&data) + }) + .collect(); + + group.bench_with_input( + BenchmarkId::new("build_merkle_trees", size_str), + &hashes, + |b, hashes| { + b.iter(|| { + // Build 2 * n_shards Merkle trees (primary + secondary per sliver pair). + for sliver_index in 0..n_shards { + let _primary = MerkleTree::::build_from_leaf_hashes( + hashes[n_shards * sliver_index..n_shards * (sliver_index + 1)] + .iter() + .cloned(), + ); + let _secondary = MerkleTree::::build_from_leaf_hashes( + (0..n_shards).map(|symbol_index| { + hashes[n_shards * symbol_index + n_shards - 1 - sliver_index] + .clone() + }), + ); + } + }); + }, + ); + } + + group.finish(); +} + +/// Benchmark the full `encode_with_metadata()` pipeline for comparison. +fn full_pipeline(c: &mut Criterion) { + let config = encoding_config(); + let mut group = c.benchmark_group("full_pipeline"); + group.plot_config(PlotConfiguration::default().summary_scale(AxisScale::Logarithmic)); + + for (blob_size, size_str) in BLOB_SIZES { + let blob = random_data((*blob_size).try_into().unwrap()); + group.throughput(criterion::Throughput::Bytes(*blob_size)); + + group.bench_with_input( + BenchmarkId::new("encode_with_metadata", size_str), + &blob, + |b, blob| { + b.iter(|| { + let encoder = config.get_blob_encoder(blob).unwrap(); + let _result = encoder.encode_with_metadata(); + }); + }, + ); + } + + group.finish(); +} + +fn main() { + let mut criterion = Criterion::default() + .configure_from_args() + .sample_size(10) + .warm_up_time(Duration::from_millis(10)); + + secondary_encoding(&mut criterion); + primary_encoding(&mut criterion); + primary_encoding_with_hashing(&mut criterion); + metadata_from_hashes(&mut criterion); + full_pipeline(&mut criterion); + + criterion.final_summary(); +} diff --git a/crates/walrus-core/examples/profile_encoding.rs b/crates/walrus-core/examples/profile_encoding.rs new file mode 100644 index 0000000000..969dc3d538 --- /dev/null +++ b/crates/walrus-core/examples/profile_encoding.rs @@ -0,0 +1,237 @@ +// Copyright (c) Walrus Foundation +// SPDX-License-Identifier: Apache-2.0 + +// Allowing `unwrap`s in examples. +#![allow(clippy::unwrap_used)] + +//! Profiling binary for blob encoding. +//! +//! Runs `encode_with_metadata()` on configurable blob sizes with wall-clock timing. +//! Designed to be run under `samply record` or `cargo flamegraph` without criterion overhead. +//! +//! ```bash +//! cargo build --release --example profile_encoding +//! samply record ./target/release/examples/profile_encoding --size 32m +//! ``` + +use std::{alloc::System, num::NonZeroU16, time::Instant}; + +use clap::Parser; +use peakmem_alloc::{PeakMemAlloc, PeakMemAllocTrait}; +use walrus_core::encoding::{BlobEncoder, ReedSolomonEncodingConfig}; +use walrus_test_utils::random_data; + +#[global_allocator] +static PEAK_ALLOC: PeakMemAlloc = PeakMemAlloc::new(System); + +fn get_peak_rss_bytes() -> usize { + unsafe { + let mut usage: libc::rusage = std::mem::zeroed(); + libc::getrusage(libc::RUSAGE_SELF, &mut usage); + let max_rss = usage.ru_maxrss as usize; + // macOS reports bytes, Linux reports KB + if cfg!(target_os = "macos") { + max_rss + } else { + max_rss * 1024 + } + } +} + +#[derive(Parser)] +#[command(about = "Profile blob encoding pipeline")] +struct Args { + /// Blob size (e.g. 1k, 1m, 32m, 256m, 1g) + #[arg(long, default_value = "32m", value_parser = parse_size)] + size: usize, + + /// Number of shards + #[arg(long, default_value_t = 1000)] + shards: u16, + + /// Number of iterations + #[arg(long, default_value_t = 1)] + iterations: u32, + + /// Number of blobs to encode concurrently (simulates multi-blob uploads) + #[arg(long, default_value_t = 1)] + concurrent_blobs: u32, +} + +fn parse_size(s: &str) -> Result { + let s = s.to_lowercase(); + let (num, mult) = if let Some(n) = s.strip_suffix('g') { + (n, 1 << 30) + } else if let Some(n) = s.strip_suffix('m') { + (n, 1 << 20) + } else if let Some(n) = s.strip_suffix('k') { + (n, 1 << 10) + } else { + (s.as_str(), 1) + }; + let n: usize = num.parse().map_err(|e| format!("invalid size: {e}"))?; + Ok(n * mult) +} + +fn main() { + let args = Args::parse(); + let config = ReedSolomonEncodingConfig::new(NonZeroU16::new(args.shards).unwrap()); + + if args.concurrent_blobs > 1 { + print!( + "blob_size={} shards={} iterations={} concurrent_blobs={}", + format_size(args.size), + args.shards, + args.iterations, + args.concurrent_blobs + ); + } else { + print!( + "blob_size={} shards={} iterations={}", + format_size(args.size), + args.shards, + args.iterations + ); + } + + let blob = random_data(args.size); + let symbol_size = { + let encoder = config.get_blob_encoder(&blob).unwrap(); + encoder.symbol_usize() + }; + println!("\nsymbol_size={symbol_size}"); + + if args.concurrent_blobs <= 1 { + run_single_blob(&args, &config, &blob); + } else { + run_concurrent_blobs(&args, &config, &blob); + } +} + +fn run_single_blob(args: &Args, config: &ReedSolomonEncodingConfig, blob: &[u8]) { + let mut durations = Vec::with_capacity(args.iterations.try_into().unwrap()); + let mut max_peak_heap: usize = 0; + + for i in 0..args.iterations { + let blob_copy = blob.to_vec(); + let encoder = config.get_blob_encoder(&blob_copy).unwrap(); + + PEAK_ALLOC.reset_peak_memory(); + let start = Instant::now(); + let (_sliver_pairs, _metadata) = encoder.encode_with_metadata(); + let elapsed = start.elapsed(); + let peak_heap = PEAK_ALLOC.get_peak_memory(); + let peak_rss = get_peak_rss_bytes(); + + durations.push(elapsed); + max_peak_heap = max_peak_heap.max(peak_heap); + let throughput_mbs = args.size as f64 / elapsed.as_secs_f64() / (1024.0 * 1024.0); + let expansion = peak_heap as f64 / args.size as f64; + println!( + " iteration {}: {:.3}s ({:.1} MiB/s) peak_heap={} peak_rss={} expansion={:.1}x", + i + 1, + elapsed.as_secs_f64(), + throughput_mbs, + format_size(peak_heap), + format_size(peak_rss), + expansion + ); + } + + if args.iterations > 1 { + let total: f64 = durations.iter().map(|d| d.as_secs_f64()).sum(); + let avg = total / f64::from(args.iterations); + let throughput_mbs = args.size as f64 / avg / (1024.0 * 1024.0); + println!( + "average: {avg:.3}s ({throughput_mbs:.1} MiB/s) max_peak_heap={}", + format_size(max_peak_heap) + ); + } +} + +fn run_concurrent_blobs(args: &Args, config: &ReedSolomonEncodingConfig, blob: &[u8]) { + let n: usize = args.concurrent_blobs.try_into().unwrap(); + let mut durations = Vec::with_capacity(args.iterations.try_into().unwrap()); + let mut max_peak_heap: usize = 0; + + for i in 0..args.iterations { + // Pre-generate N blob copies and N encoders. + let blob_copies: Vec> = (0..n).map(|_| blob.to_vec()).collect(); + let encoders: Vec> = blob_copies + .iter() + .map(|b| config.get_blob_encoder(b).unwrap()) + .collect(); + + PEAK_ALLOC.reset_peak_memory(); + let start = Instant::now(); + + // Use std::thread::scope to spawn N threads, each encoding one blob. + let per_blob_elapsed: Vec<_> = std::thread::scope(|s| { + let handles: Vec<_> = encoders + .into_iter() + .map(|encoder| { + s.spawn(move || { + let blob_start = Instant::now(); + let (_sliver_pairs, _metadata) = encoder.encode_with_metadata(); + blob_start.elapsed() + }) + }) + .collect(); + handles.into_iter().map(|h| h.join().unwrap()).collect() + }); + + let wall_time = start.elapsed(); + let peak_heap = PEAK_ALLOC.get_peak_memory(); + let peak_rss = get_peak_rss_bytes(); + + durations.push(wall_time); + max_peak_heap = max_peak_heap.max(peak_heap); + + println!( + " iteration {}: {:.3}s total wall time", + i + 1, + wall_time.as_secs_f64() + ); + for (j, elapsed) in per_blob_elapsed.iter().enumerate() { + let throughput_mbs = args.size as f64 / elapsed.as_secs_f64() / (1024.0 * 1024.0); + println!( + " blob {}: {:.3}s ({:.1} MiB/s)", + j + 1, + elapsed.as_secs_f64(), + throughput_mbs + ); + } + let total_data = args.size * n; + let expansion = peak_heap as f64 / total_data as f64; + println!( + " peak_heap={} peak_rss={} expansion={:.1}x (per blob: {})", + format_size(peak_heap), + format_size(peak_rss), + expansion, + format_size(peak_heap / n) + ); + } + + if args.iterations > 1 { + let total: f64 = durations.iter().map(|d| d.as_secs_f64()).sum(); + let avg = total / f64::from(args.iterations); + let total_data = args.size * n; + let throughput_mbs = total_data as f64 / avg / (1024.0 * 1024.0); + println!( + "average: {avg:.3}s ({throughput_mbs:.1} MiB/s) max_peak_heap={}", + format_size(max_peak_heap) + ); + } +} + +fn format_size(bytes: usize) -> String { + if bytes >= 1 << 30 { + format!("{}GiB", bytes >> 30) + } else if bytes >= 1 << 20 { + format!("{}MiB", bytes >> 20) + } else if bytes >= 1 << 10 { + format!("{}KiB", bytes >> 10) + } else { + format!("{bytes}B") + } +} diff --git a/crates/walrus-core/src/merkle.rs b/crates/walrus-core/src/merkle.rs index cd0eef3692..f4bb7162ec 100644 --- a/crates/walrus-core/src/merkle.rs +++ b/crates/walrus-core/src/merkle.rs @@ -310,7 +310,7 @@ where } /// Computes the hash of the provided input to be used as a leaf hash of a Merkle tree. -pub(crate) fn leaf_hash(input: &[u8]) -> Node +pub fn leaf_hash(input: &[u8]) -> Node where T: HashFunction, {