diff --git a/crates/relayer/src/event/bus.rs b/crates/relayer/src/event/bus.rs index c7b750048a..f3f1424bf7 100644 --- a/crates/relayer/src/event/bus.rs +++ b/crates/relayer/src/event/bus.rs @@ -38,8 +38,10 @@ impl EventBus { } } - // Remove all disconnected subscribers - for idx in disconnected { + // Remove all disconnected subscribers in reverse order. + // This is critical: removing in ascending order would shift indices + // and cause wrong elements to be removed. + for idx in disconnected.into_iter().rev() { self.txs.remove(idx); } } @@ -117,4 +119,38 @@ mod tests { assert_eq!(counter(), 20); } + + #[test] + #[serial] + fn multiple_disconnected_subscribers() { + reset_counter(); + + let mut bus: EventBus = EventBus::new(); + + // Create 5 subscribers + let rx0 = bus.subscribe(); + let rx1 = bus.subscribe(); + let rx2 = bus.subscribe(); + let rx3 = bus.subscribe(); + let rx4 = bus.subscribe(); + + assert_eq!(bus.txs.len(), 5); + + // Drop subscribers at indices 1 and 3 (non-contiguous) + // This tests that index removal works correctly + drop(rx1); + drop(rx3); + + // Broadcast should succeed for remaining subscribers (rx0, rx2, rx4) + // and clean up disconnected ones + bus.broadcast(42); + + // Verify bus state is correct after cleanup - disconnected senders removed + assert_eq!(bus.txs.len(), 3); + + // Verify remaining subscribers received the message + assert_eq!(rx0.recv(), Ok(42)); + assert_eq!(rx2.recv(), Ok(42)); + assert_eq!(rx4.recv(), Ok(42)); + } } diff --git a/crates/relayer/src/supervisor.rs b/crates/relayer/src/supervisor.rs index c994ac6585..28bbbde39f 100644 --- a/crates/relayer/src/supervisor.rs +++ b/crates/relayer/src/supervisor.rs @@ -837,7 +837,7 @@ fn process_batch( trace!( "skipping events for '{}': destination chain '{}' is not registered", object.short_name(), - object.src_chain_id() + object.dst_chain_id() ); continue; diff --git a/crates/relayer/src/worker/packet.rs b/crates/relayer/src/worker/packet.rs index 8a083a53dd..5885210f8b 100644 --- a/crates/relayer/src/worker/packet.rs +++ b/crates/relayer/src/worker/packet.rs @@ -276,13 +276,24 @@ fn handle_packet_cmd( WorkerCmd::IbcEvents { batch } if link.a_to_b.channel().ordering == Ordering::Ordered => { let lowest_sequence = lowest_sequence(&batch.events); - let next_sequence = query_next_sequence_receive( + let next_sequence = match query_next_sequence_receive( link.a_to_b.dst_chain(), link.a_to_b.dst_port_id(), link.a_to_b.dst_channel_id(), QueryHeight::Specific(batch.height), - ) - .ok(); + ) { + Ok(seq) => Some(seq), + Err(e) => { + // Log the error but continue - clearing will be triggered due to + // None < Some(x) being true, which is the safe fallback behavior + warn!( + "failed to query next_sequence_receive for ordered channel, \ + will trigger clearing as fallback: {}", + e + ); + None + } + }; if *should_clear_on_start || next_sequence < lowest_sequence { handle_clear_packet(link, clear_interval, path, Some(batch.height), clear_limit)?;