Skip to content

Commit de661ff

Browse files
committed
perf: parallelize compute_metadata() with rayon
Materialize all secondary slivers upfront and use par_iter over all n_shards columns for primary encoding + hashing, matching the pattern in encode_with_metadata(). Each rayon task creates its own encoder. Replaces two sequential encoding loops with a single parallel pass.
1 parent 0ad7523 commit de661ff

1 file changed

Lines changed: 35 additions & 41 deletions

File tree

crates/walrus-core/src/encoding/blob_encoding.rs

Lines changed: 35 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -441,16 +441,19 @@ impl<'a> BlobEncoder<'a> {
441441
u64::try_from(self.blob.len()).expect("any valid blob size fits into a `u64`");
442442
let n_shards = self.inner.n_shards_usize();
443443

444-
let n_non_systematic_secondary_slivers = n_shards - self.inner.n_columns_usize();
445-
// Use a dummy sliver index for the non-systematic secondary slivers.
446-
let mut non_systematic_secondary_slivers =
447-
vec![
448-
self.inner.empty_sliver::<Secondary>(SliverIndex(0));
449-
n_non_systematic_secondary_slivers
450-
];
451-
452-
// Compute the non-systematic secondary slivers by encoding the rows (i.e., primary
453-
// slivers).
444+
// Materialize all secondary slivers (systematic + non-systematic).
445+
let mut secondary_slivers = self.inner.empty_slivers::<Secondary>();
446+
447+
// Copy systematic columns from blob data.
448+
for (column, sliver) in self.column_symbols().zip(secondary_slivers.iter_mut()) {
449+
sliver
450+
.symbols
451+
.to_symbols_mut()
452+
.zip(column)
453+
.for_each(|(dest, src)| dest[..src.len()].copy_from_slice(src))
454+
}
455+
456+
// Compute non-systematic secondary slivers by encoding rows (sequential).
454457
let mut secondary_encoder = self.inner.get_encoder::<Secondary>();
455458
let mut buffer = Symbols::zeros(self.inner.n_columns_usize(), self.inner.symbol_size);
456459
let row_length_bytes =
@@ -467,48 +470,39 @@ impl<'a> BlobEncoder<'a> {
467470
let encode_result = secondary_encoder
468471
.encode(data)
469472
.expect("size has already been checked");
470-
for (symbol, sliver) in encode_result
471-
.recovery_iter()
472-
.zip(non_systematic_secondary_slivers.iter_mut())
473-
{
473+
for (symbol, sliver) in encode_result.recovery_iter().zip(
474+
secondary_slivers
475+
.iter_mut()
476+
.skip(self.inner.n_columns_usize()),
477+
) {
474478
sliver.copy_symbol_to(r, symbol);
475479
}
476480
}
477481
drop(secondary_encoder);
478482

479-
// Now we can encode all secondary slivers, computing all symbol hashes.
483+
// Parallel primary encoding + hashing over all secondary slivers.
480484
let mut symbol_hashes = vec![Node::Empty; n_shards * n_shards];
481485

482-
// First encode the systematic secondary slivers.
483-
let mut primary_encoder = self.inner.get_encoder::<Primary>();
484-
let mut buffer = Symbols::zeros(self.inner.n_rows_usize(), self.inner.symbol_size);
485-
for (col_index, column_symbols) in self.column_symbols().enumerate() {
486-
buffer.data_mut().fill(0);
487-
buffer
488-
.to_symbols_mut()
489-
.zip(column_symbols)
490-
.for_each(|(dest, src)| dest[..src.len()].copy_from_slice(src));
491-
let symbols = primary_encoder
492-
.encode_all_ref(buffer.data())
493-
.expect("size has already been checked");
494-
for (row_index, symbol) in symbols.to_symbols().enumerate() {
495-
symbol_hashes[n_shards * row_index + col_index] = leaf_hash::<Blake2b256>(symbol);
496-
}
497-
}
486+
let column_results: Vec<(usize, Vec<Node>)> = secondary_slivers
487+
.par_iter()
488+
.enumerate()
489+
.map(|(col_index, column)| {
490+
let mut encoder = self.inner.get_encoder::<Primary>();
491+
let symbols = encoder
492+
.encode_all(column.symbols.data())
493+
.expect("size has already been checked");
494+
let hashes: Vec<Node> = symbols.to_symbols().map(leaf_hash::<Blake2b256>).collect();
495+
(col_index, hashes)
496+
})
497+
.collect();
498498

499-
// Then encode the non-systematic secondary slivers.
500-
for (col_index, column) in non_systematic_secondary_slivers.iter().enumerate() {
501-
let col_index = col_index + self.inner.n_columns_usize();
502-
let symbols = primary_encoder
503-
.encode_all_ref(column.symbols.data())
504-
.expect("size has already been checked");
505-
for (row_index, symbol) in symbols.to_symbols().enumerate() {
506-
symbol_hashes[n_shards * row_index + col_index] = leaf_hash::<Blake2b256>(symbol);
499+
// Sequential scatter.
500+
for (col_index, hashes) in column_results {
501+
for (row_index, hash) in hashes.into_iter().enumerate() {
502+
symbol_hashes[n_shards * row_index + col_index] = hash;
507503
}
508504
}
509505

510-
drop(primary_encoder);
511-
512506
BlobEncoderData::compute_metadata_from_symbol_hashes(
513507
self.inner.config,
514508
&symbol_hashes,

0 commit comments

Comments
 (0)