diff --git a/Cargo.lock b/Cargo.lock index cfd3531c48..901e5f2212 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -149,6 +149,12 @@ dependencies = [ "libc", ] +[[package]] +name = "anes" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" + [[package]] name = "anstream" version = "0.6.19" @@ -923,7 +929,7 @@ dependencies = [ "bitflags 2.9.1", "cexpr", "clang-sys", - "itertools 0.10.5", + "itertools 0.13.0", "proc-macro2 1.0.95", "quote", "regex", @@ -1295,6 +1301,12 @@ dependencies = [ "thiserror 1.0.69", ] +[[package]] +name = "cast" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" + [[package]] name = "cbc" version = "0.1.2" @@ -1400,6 +1412,33 @@ dependencies = [ "windows-link", ] +[[package]] +name = "ciborium" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42e69ffd6f0917f5c029256a24d0161db17cea3997d185db0d35926308770f0e" +dependencies = [ + "ciborium-io", + "ciborium-ll", + "serde", +] + +[[package]] +name = "ciborium-io" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05afea1e0a06c9be33d539b876f1ce3692f4afea2cb41f740e7743225ed1c757" + +[[package]] +name = "ciborium-ll" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57663b653d948a338bfb3eeba9bb2fd5fcfaecb9e199e87e1eda4d9e8b240fd9" +dependencies = [ + "ciborium-io", + "half 2.7.1", +] + [[package]] name = "cipher" version = "0.3.0" @@ -1820,6 +1859,42 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "criterion" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2b12d017a929603d80db1831cd3a24082f8137ce19c69e6447f54f5fc8d692f" +dependencies = [ + "anes", + "cast", + "ciborium", + "clap 4.5.40", + "criterion-plot", + "is-terminal", + "itertools 0.10.5", + "num-traits", + "once_cell", + "oorandom", + "plotters", + "rayon", + "regex", + "serde", + "serde_derive", + "serde_json", + "tinytemplate", + "walkdir", +] + +[[package]] +name = "criterion-plot" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1" +dependencies = [ + "cast", + "itertools 0.10.5", +] + [[package]] name = "critical-section" version = "1.2.0" @@ -3479,6 +3554,17 @@ version = "1.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b43ede17f21864e81be2fa654110bf1e793774238d86ef8555c37e6519c0403" +[[package]] +name = "half" +version = "2.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ea2d84b969582b4b1864a92dc5d27cd2b77b622a8d79306834f1be5ba20d84b" +dependencies = [ + "cfg-if", + "crunchy", + "zerocopy", +] + [[package]] name = "hash32" version = "0.2.1" @@ -4502,6 +4588,7 @@ dependencies = [ "bs58", "byte-unit", "bytes", + "criterion", "crossbeam-channel", "digest 0.10.7", "dirs-next", @@ -5295,6 +5382,17 @@ dependencies = [ "serde", ] +[[package]] +name = "is-terminal" +version = "0.4.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3640c1c38b8e4e43584d8df18be5fc6b0aa314ce6ebf51b53313d4306cca8e46" +dependencies = [ + "hermit-abi 0.5.2", + "libc", + "windows-sys 0.60.2", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.1" @@ -6799,6 +6897,12 @@ dependencies = [ "eyre", ] +[[package]] +name = "oorandom" +version = "11.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6790f58c7ff633d8771f42965289203411a5e5c68388703c06e14f24770b41e" + [[package]] name = "opaque-debug" version = "0.3.1" @@ -8297,6 +8401,34 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" +[[package]] +name = "plotters" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aeb6f403d7a4911efb1e33402027fc44f29b5bf6def3effcc22d7bb75f2b747" +dependencies = [ + "num-traits", + "plotters-backend", + "plotters-svg", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "plotters-backend" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df42e13c12958a16b3f7f4386b9ab1f3e7933914ecea48da7139435263a4172a" + +[[package]] +name = "plotters-svg" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51bae2ac328883f7acdfea3d66a7c35751187f870bc81f94563733a154d7a670" +dependencies = [ + "plotters-backend", +] + [[package]] name = "poly1305" version = "0.7.2" @@ -9666,7 +9798,7 @@ version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2bef2ebfde456fb76bbcf9f59315333decc4fda0b2b44b420243c11e0f5ec1f5" dependencies = [ - "half", + "half 1.8.3", "serde", ] @@ -10715,6 +10847,16 @@ dependencies = [ "zerovec", ] +[[package]] +name = "tinytemplate" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "tinyvec" version = "1.9.0" diff --git a/crates/relayer/Cargo.toml b/crates/relayer/Cargo.toml index 672c34c00d..67c4d8bd38 100644 --- a/crates/relayer/Cargo.toml +++ b/crates/relayer/Cargo.toml @@ -97,6 +97,11 @@ ibc-relayer-types = { workspace = true } serial_test = { workspace = true } env_logger = { workspace = true } test-log = { workspace = true, features = ["trace"] } +criterion = "0.5" # Needed for generating (synthetic) light blocks. tendermint-testgen = { workspace = true } + +[[bench]] +name = "vector_ops" +harness = false diff --git a/crates/relayer/benches/vector_ops.rs b/crates/relayer/benches/vector_ops.rs new file mode 100644 index 0000000000..33c31d4f56 --- /dev/null +++ b/crates/relayer/benches/vector_ops.rs @@ -0,0 +1,210 @@ +use std::str::FromStr; +use std::time::Duration; + +use criterion::{black_box, criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion}; +use ibc_proto::google::protobuf::Any; +use ibc_relayer::chain::tracking::TrackingId; +use ibc_relayer::event::IbcEventWithHeight; +use ibc_relayer::link::operational_data::{OperationalData, OperationalDataTarget, TransitMessage}; +use ibc_relayer_types::core::ics04_channel::events::{SendPacket, WriteAcknowledgement}; +use ibc_relayer_types::core::ics04_channel::packet::Packet; +use ibc_relayer_types::core::ics04_channel::packet::Sequence; +use ibc_relayer_types::core::ics04_channel::timeout::TimeoutHeight; +use ibc_relayer_types::core::ics24_host::identifier::{ChannelId, PortId}; +use ibc_relayer_types::events::IbcEvent; +use ibc_relayer_types::timestamp::Timestamp; +use ibc_relayer_types::Height; +use once_cell::sync::Lazy; + +const NUM_BATCHES: usize = 32; +const TIMEOUT_STRIDE: u64 = 5; +static PORT_ID: Lazy = Lazy::new(|| PortId::from_str("transfer").expect("valid port id")); +static CHANNEL_ID: Lazy = + Lazy::new(|| ChannelId::from_str("channel-0").expect("valid channel id")); + +fn relay_path_benchmark(c: &mut Criterion) { + let mut group = c.benchmark_group("relay_path_timeout_processing"); + + for &batch_size in &[32usize, 64, 128] { + group.bench_with_input( + BenchmarkId::new("legacy_clone", batch_size), + &batch_size, + |b, &size| { + b.iter_batched( + || build_operational_batches(size), + |dataset| { + let (_, total) = process_batches_with_clones(dataset); + black_box(total); + }, + BatchSize::SmallInput, + ); + }, + ); + + group.bench_with_input( + BenchmarkId::new("drain_reuse", batch_size), + &batch_size, + |b, &size| { + b.iter_batched( + || build_operational_batches(size), + |dataset| { + let (_, total) = process_batches_with_drain(dataset); + black_box(total); + }, + BatchSize::SmallInput, + ); + }, + ); + } + + group.finish(); +} + +fn build_operational_batches(batch_size: usize) -> Vec { + (0..NUM_BATCHES) + .map(|batch_index| { + let mut odata = OperationalData::new( + Height::new(0, 10).expect("height"), + OperationalDataTarget::Destination, + TrackingId::new_static("bench"), + Duration::ZERO, + ); + + for entry in 0..batch_size { + let seq = (batch_index * batch_size + entry + 1) as u64; + let message = if seq % TIMEOUT_STRIDE == 0 { + make_send_packet(seq) + } else { + make_write_ack(seq) + }; + + odata.push(message); + } + + odata + }) + .collect() +} + +fn make_send_packet(sequence: u64) -> TransitMessage { + let packet = Packet { + sequence: Sequence::from(sequence), + source_port: PORT_ID.clone(), + source_channel: CHANNEL_ID.clone(), + destination_port: PORT_ID.clone(), + destination_channel: CHANNEL_ID.clone(), + data: vec![0; 32], + timeout_height: TimeoutHeight::default(), + timeout_timestamp: Timestamp::none(), + }; + + let event = IbcEvent::SendPacket(SendPacket { packet }); + + TransitMessage { + event_with_height: IbcEventWithHeight::new( + event, + Height::new(0, sequence + 1).expect("height"), + ), + msg: Any { + type_url: "bench/send".into(), + value: sequence.to_le_bytes().to_vec(), + }, + } +} + +fn make_write_ack(sequence: u64) -> TransitMessage { + let packet = Packet { + sequence: Sequence::from(sequence), + source_port: PORT_ID.clone(), + source_channel: CHANNEL_ID.clone(), + destination_port: PORT_ID.clone(), + destination_channel: CHANNEL_ID.clone(), + data: vec![1; 32], + timeout_height: TimeoutHeight::default(), + timeout_timestamp: Timestamp::none(), + }; + + let event = IbcEvent::WriteAcknowledgement(WriteAcknowledgement { + packet, + ack: vec![0_u8; 8], + }); + + TransitMessage { + event_with_height: IbcEventWithHeight::new( + event, + Height::new(0, sequence + 1).expect("height"), + ), + msg: Any { + type_url: "bench/ack".into(), + value: sequence.to_le_bytes().to_vec(), + }, + } +} + +fn build_timeout_msg() -> Any { + Any { + type_url: "bench/timeout".into(), + value: vec![0_u8; 4], + } +} + +fn should_timeout(sequence: &Sequence) -> bool { + sequence.as_u64() % TIMEOUT_STRIDE == 0 +} + +fn process_batches_with_clones(mut data: Vec) -> (Vec, usize) { + let mut timed_out: Vec> = vec![Vec::new(); data.len()]; + let mut total_timeouts = 0usize; + + for (idx, odata) in data.iter_mut().enumerate() { + let mut retain_batch = Vec::new(); + + for gm in odata.batch.iter() { + match &gm.event_with_height.event { + IbcEvent::SendPacket(event) if should_timeout(&event.packet.sequence) => { + total_timeouts += 1; + let mut cloned = gm.clone(); + cloned.msg = build_timeout_msg(); + timed_out[idx].push(cloned); + } + _ => retain_batch.push(gm.clone()), + } + } + + odata.batch = retain_batch; + } + + let scheduled: usize = timed_out.into_iter().map(|v| v.len()).sum(); + (data, total_timeouts + scheduled) +} + +fn process_batches_with_drain(mut data: Vec) -> (Vec, usize) { + let mut timed_out: Vec> = vec![Vec::new(); data.len()]; + let mut total_timeouts = 0usize; + + for (idx, odata) in data.iter_mut().enumerate() { + let mut retain_batch = Vec::with_capacity(odata.batch.len()); + + for gm in odata.batch.drain(..) { + match &gm.event_with_height.event { + IbcEvent::SendPacket(event) if should_timeout(&event.packet.sequence) => { + total_timeouts += 1; + let timeout_msg = TransitMessage { + event_with_height: gm.event_with_height.clone(), + msg: build_timeout_msg(), + }; + timed_out[idx].push(timeout_msg); + } + _ => retain_batch.push(gm), + } + } + + odata.batch = retain_batch; + } + + let scheduled: usize = timed_out.into_iter().map(|v| v.len()).sum(); + (data, total_timeouts + scheduled) +} + +criterion_group!(benches, relay_path_benchmark); +criterion_main!(benches); diff --git a/crates/relayer/src/link/relay_path.rs b/crates/relayer/src/link/relay_path.rs index ff16acd086..12baf4c943 100644 --- a/crates/relayer/src/link/relay_path.rs +++ b/crates/relayer/src/link/relay_path.rs @@ -1,4 +1,3 @@ -use alloc::collections::BTreeMap as HashMap; use alloc::collections::VecDeque; use ibc_relayer_types::core::ics04_channel::packet::Sequence; use std::ops::Sub; @@ -1680,19 +1679,17 @@ impl RelayPath { // to source operational data. let mut all_dst_odata = self.dst_operational_data.clone_vec(); - let mut timed_out: HashMap = HashMap::default(); + let mut timed_out: Vec> = vec![None; all_dst_odata.len()]; // For each operational data targeting the destination chain... for (odata_pos, odata) in all_dst_odata.iter_mut().enumerate() { // ... check each `SendPacket` event, whether it should generate a timeout message - let mut retain_batch = vec![]; + let mut retain_batch = Vec::with_capacity(odata.batch.len()); + let odata_info = odata.info(); + let odata_tracking_id = odata.tracking_id; - for gm in odata.batch.iter() { - let TransitMessage { - event_with_height, .. - } = gm; - - match &event_with_height.event { + for gm in odata.batch.drain(..) { + match &gm.event_with_height.event { IbcEvent::SendPacket(event) => { // Catch any SendPacket event that timed-out if self.send_packet_event_handled(event)? { @@ -1702,36 +1699,37 @@ impl RelayPath { { debug!( "found a timed-out message in the operational data: {}", - odata.info(), + odata_info, ); - timed_out - .entry(odata_pos) - .or_insert_with(|| { - OperationalData::new( - dst_current_height, - OperationalDataTarget::Source, - odata.tracking_id, - self.channel.connection_delay, - ) - }) - .push(TransitMessage { - event_with_height: event_with_height.clone(), - msg: new_msg, - }); + let slot = timed_out + .get_mut(odata_pos) + .expect("timed_out vector is sized to match operational data"); + slot.get_or_insert_with(|| { + OperationalData::new( + dst_current_height, + OperationalDataTarget::Source, + odata_tracking_id, + self.channel.connection_delay, + ) + }) + .push(TransitMessage { + event_with_height: gm.event_with_height.clone(), + msg: new_msg, + }); } else { // A SendPacket event, but did not time-out yet, retain - retain_batch.push(gm.clone()); + retain_batch.push(gm); } } IbcEvent::WriteAcknowledgement(event) => { if self.write_ack_event_handled(event)? { debug!(?event, "WriteAcknowledgement has already been handled"); } else { - retain_batch.push(gm.clone()); + retain_batch.push(gm); } } - _ => retain_batch.push(gm.clone()), + _ => retain_batch.push(gm), } } @@ -1747,13 +1745,13 @@ impl RelayPath { self.dst_operational_data.replace(all_dst_odata); // Handle timed-out events - if timed_out.is_empty() { + if timed_out.iter().all(|entry| entry.is_none()) { // Nothing timed out in the meantime return Ok(()); } // Schedule new operational data targeting the source chain - for (_, new_od) in timed_out.into_iter() { + for new_od in timed_out.into_iter().flatten() { info!( "re-scheduling from new timed-out batch of size {}", new_od.batch.len()