Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/walrus-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@ walrus-test-utils = { workspace = true, optional = true }
anyhow.workspace = true
clap.workspace = true
criterion.workspace = true
libc.workspace = true
peakmem-alloc = "0.3"
serde_test.workspace = true
tracing-subscriber.workspace = true
walrus-test-utils.workspace = true
walrus-utils = { path = "../walrus-utils" }

[lints]
workspace = true
Expand Down
173 changes: 137 additions & 36 deletions crates/walrus-core/examples/profile_encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,30 @@
//! samply record ./target/release/examples/profile_encoding --size 32m
//! ```

use std::{num::NonZeroU16, time::Instant};
use std::{alloc::System, num::NonZeroU16, time::Instant};

use clap::Parser;
use walrus_core::encoding::ReedSolomonEncodingConfig;
use peakmem_alloc::{PeakMemAlloc, PeakMemAllocTrait};
use walrus_core::encoding::{BlobEncoder, ReedSolomonEncodingConfig};
use walrus_test_utils::random_data;
use walrus_utils::size::{format_size, parse_size};

#[global_allocator]
static PEAK_ALLOC: PeakMemAlloc<System> = PeakMemAlloc::new(System);

fn get_peak_rss_bytes() -> usize {
unsafe {
let mut usage: libc::rusage = std::mem::zeroed();
libc::getrusage(libc::RUSAGE_SELF, &mut usage);
let max_rss = usage.ru_maxrss as usize;
// macOS reports bytes, Linux reports KB
if cfg!(target_os = "macos") {
max_rss
} else {
max_rss * 1024
}
}
}

#[derive(Parser)]
#[command(about = "Profile blob encoding pipeline")]
Expand All @@ -34,77 +53,159 @@ struct Args {
/// Number of iterations
#[arg(long, default_value_t = 1)]
iterations: u32,
}

fn parse_size(s: &str) -> Result<usize, String> {
let s = s.to_lowercase();
let (num, mult) = if let Some(n) = s.strip_suffix('g') {
(n, 1 << 30)
} else if let Some(n) = s.strip_suffix('m') {
(n, 1 << 20)
} else if let Some(n) = s.strip_suffix('k') {
(n, 1 << 10)
} else {
(s.as_str(), 1)
};
let n: usize = num.parse().map_err(|e| format!("invalid size: {e}"))?;
Ok(n * mult)
/// Number of blobs to encode concurrently (simulates multi-blob uploads)
#[arg(long, default_value_t = 1)]
concurrent_blobs: u32,
}

fn main() {
let args = Args::parse();
let config = ReedSolomonEncodingConfig::new(NonZeroU16::new(args.shards).unwrap());

println!(
"blob_size={} shards={} iterations={}",
format_size(args.size),
args.shards,
args.iterations
);
if args.concurrent_blobs > 1 {
print!(
"blob_size={} shards={} iterations={} concurrent_blobs={}",
format_size(args.size),
args.shards,
args.iterations,
args.concurrent_blobs
);
} else {
print!(
"blob_size={} shards={} iterations={}",
format_size(args.size),
args.shards,
args.iterations
);
}

let blob = random_data(args.size);
let symbol_size = {
let encoder = config.get_blob_encoder(&blob).unwrap();
encoder.symbol_usize()
};
println!("symbol_size={symbol_size}");
println!("\nsymbol_size={symbol_size}");

if args.concurrent_blobs <= 1 {
run_single_blob(&args, &config, &blob);
} else {
run_concurrent_blobs(&args, &config, &blob);
}
}

fn run_single_blob(args: &Args, config: &ReedSolomonEncodingConfig, blob: &[u8]) {
let mut durations = Vec::with_capacity(args.iterations.try_into().unwrap());
let mut max_peak_heap: usize = 0;

for i in 0..args.iterations {
let blob_copy = blob.clone();
let blob_copy = blob.to_vec();
let encoder = config.get_blob_encoder(&blob_copy).unwrap();

PEAK_ALLOC.reset_peak_memory();
let start = Instant::now();
let (_sliver_pairs, _metadata) = encoder.encode_with_metadata();
let elapsed = start.elapsed();
let peak_heap = PEAK_ALLOC.get_peak_memory();
let peak_rss = get_peak_rss_bytes();

durations.push(elapsed);
max_peak_heap = max_peak_heap.max(peak_heap);
let throughput_mbs = args.size as f64 / elapsed.as_secs_f64() / (1024.0 * 1024.0);
let expansion = peak_heap as f64 / args.size as f64;
println!(
" iteration {}: {:.3}s ({:.1} MiB/s)",
" iteration {}: {:.3}s ({:.1} MiB/s) peak_heap={} peak_rss={} expansion={:.1}x",
i + 1,
elapsed.as_secs_f64(),
throughput_mbs
throughput_mbs,
format_size(peak_heap),
format_size(peak_rss),
expansion
);
}

if args.iterations > 1 {
let total: f64 = durations.iter().map(|d| d.as_secs_f64()).sum();
let avg = total / f64::from(args.iterations);
let throughput_mbs = args.size as f64 / avg / (1024.0 * 1024.0);
println!("average: {avg:.3}s ({throughput_mbs:.1} MiB/s)");
println!(
"average: {avg:.3}s ({throughput_mbs:.1} MiB/s) max_peak_heap={}",
format_size(max_peak_heap)
);
}
}

fn format_size(bytes: usize) -> String {
if bytes >= 1 << 30 {
format!("{}GiB", bytes >> 30)
} else if bytes >= 1 << 20 {
format!("{}MiB", bytes >> 20)
} else if bytes >= 1 << 10 {
format!("{}KiB", bytes >> 10)
} else {
format!("{bytes}B")
fn run_concurrent_blobs(args: &Args, config: &ReedSolomonEncodingConfig, blob: &[u8]) {
let n: usize = args.concurrent_blobs.try_into().unwrap();
let mut durations = Vec::with_capacity(args.iterations.try_into().unwrap());
let mut max_peak_heap: usize = 0;

for i in 0..args.iterations {
// Pre-generate N blob copies and N encoders.
let blob_copies: Vec<Vec<u8>> = (0..n).map(|_| blob.to_vec()).collect();
let encoders: Vec<BlobEncoder<'_>> = blob_copies
.iter()
.map(|b| config.get_blob_encoder(b).unwrap())
.collect();

PEAK_ALLOC.reset_peak_memory();
let start = Instant::now();

// Use std::thread::scope to spawn N threads, each encoding one blob.
let per_blob_elapsed: Vec<_> = std::thread::scope(|s| {
let handles: Vec<_> = encoders
.into_iter()
.map(|encoder| {
s.spawn(move || {
let blob_start = Instant::now();
let (_sliver_pairs, _metadata) = encoder.encode_with_metadata();
blob_start.elapsed()
})
})
.collect();
handles.into_iter().map(|h| h.join().unwrap()).collect()
});

let wall_time = start.elapsed();
let peak_heap = PEAK_ALLOC.get_peak_memory();
let peak_rss = get_peak_rss_bytes();

durations.push(wall_time);
max_peak_heap = max_peak_heap.max(peak_heap);

println!(
" iteration {}: {:.3}s total wall time",
i + 1,
wall_time.as_secs_f64()
);
for (j, elapsed) in per_blob_elapsed.iter().enumerate() {
let throughput_mbs = args.size as f64 / elapsed.as_secs_f64() / (1024.0 * 1024.0);
println!(
" blob {}: {:.3}s ({:.1} MiB/s)",
j + 1,
elapsed.as_secs_f64(),
throughput_mbs
);
}
let total_data = args.size * n;
let expansion = peak_heap as f64 / total_data as f64;
println!(
" peak_heap={} peak_rss={} expansion={:.1}x (per blob: {})",
format_size(peak_heap),
format_size(peak_rss),
expansion,
format_size(peak_heap / n)
);
}

if args.iterations > 1 {
let total: f64 = durations.iter().map(|d| d.as_secs_f64()).sum();
let avg = total / f64::from(args.iterations);
let total_data = args.size * n;
let throughput_mbs = total_data as f64 / avg / (1024.0 * 1024.0);
println!(
"average: {avg:.3}s ({throughput_mbs:.1} MiB/s) max_peak_heap={}",
format_size(max_peak_heap)
);
}
}
2 changes: 1 addition & 1 deletion crates/walrus-e2e-tests/tests/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,7 @@ async fn test_store_with_existing_storage_resource(
)
})
.collect();
let encoded_blobs = walrus_sdk::node_client::encode_blobs(unencoded_blobs, None, None)?;
let encoded_blobs = walrus_sdk::node_client::encode_blobs(unencoded_blobs, None, None, 1)?;
let encoded_sizes = encoded_blobs
.iter()
.map(|blob| {
Expand Down
10 changes: 10 additions & 0 deletions crates/walrus-sdk/src/config/communication_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,10 @@ pub struct ClientCommunicationConfig {
#[serde(rename = "registration_delay_millis")]
#[serde_as(as = "DurationMilliSeconds")]
pub registration_delay: Duration,
/// Maximum number of blobs to encode concurrently. Limits peak memory when storing
/// multiple blobs. If `None`, defaults to 1 (sequential encoding).
#[serde(default)]
pub max_concurrent_blob_encodings: Option<usize>,
/// The maximum total blob size allowed to store if multiple blobs are uploaded.
pub max_total_blob_size: usize,
/// The configuration for the backoff after committee change is detected.
Expand Down Expand Up @@ -266,6 +270,7 @@ impl Default for ClientCommunicationConfig {
Duration::from_secs(5),
Some(5),
),
max_concurrent_blob_encodings: None,
sui_client_request_timeout: None,
}
}
Expand Down Expand Up @@ -322,6 +327,8 @@ pub struct CommunicationLimits {
pub max_data_in_flight: usize,
/// Configuration for auto-tuning concurrency during writes.
pub auto_tune: DataInFlightAutoTuneConfig,
/// The maximum number of blobs to encode concurrently.
pub max_concurrent_blob_encodings: usize,
}

impl CommunicationLimits {
Expand All @@ -345,6 +352,9 @@ impl CommunicationLimits {
max_concurrent_status_reads,
max_data_in_flight: communication_config.max_data_in_flight,
auto_tune: communication_config.data_in_flight_auto_tune.clone(),
max_concurrent_blob_encodings: communication_config
.max_concurrent_blob_encodings
.unwrap_or(1),
}
}

Expand Down
Loading
Loading