From 7b0cbe48762f54bea91f9745194482bd63f34fff Mon Sep 17 00:00:00 2001 From: joshkautz Date: Tue, 3 Mar 2026 17:29:34 -0600 Subject: [PATCH] Fix channel backpressure causing keepalive timeout and stream drops When the mpsc message channel reaches capacity, the blocking send().await stalls the entire message processing loop. This prevents keepalive ping responses from being processed, causing the camera to interpret the silence as a disconnection and drop the session. This manifests as: - "Reaching limit of channel" warnings followed by stream death - Streams dying after hours of continuous operation - Users resorting to periodic cron restarts as a workaround Three changes to fix this: 1. Increase channel capacities to reduce how often channels fill: - Outgoing message channel: 100 -> 500 - Poll command channel: 200 -> 1000 - Subscriber channels: 100 -> 500 2. Use non-blocking try_send when channel is at capacity. When the channel is full, dropping a video frame is far better than blocking the message loop and losing the entire camera connection. Normal async send is still used when there is available capacity. 3. Drop audio frames when the audio AppSrc buffer is near capacity (>90% full) instead of pushing them and causing cascading backpressure that stalls the video pipeline. Audio buffer overflow was triggering pipeline flushing, which caused full stream reconnection cycles. Addresses #349, #346, #315, and the channel limit warnings in #366. --- .../core/src/bc_protocol/connection/bcconn.rs | 38 ++++++++++++------ src/rtsp/factory.rs | 40 ++++++++++++------- 2 files changed, 51 insertions(+), 27 deletions(-) diff --git a/crates/core/src/bc_protocol/connection/bcconn.rs b/crates/core/src/bc_protocol/connection/bcconn.rs index 06a5d0597..391e7e788 100644 --- a/crates/core/src/bc_protocol/connection/bcconn.rs +++ b/crates/core/src/bc_protocol/connection/bcconn.rs @@ -43,10 +43,10 @@ pub struct BcConnection { impl BcConnection { pub async fn new(mut sink: BcConnSink, mut source: BcConnSource) -> Result { - let (sinker, sinker_rx) = channel::>(100); + let (sinker, sinker_rx) = channel::>(500); let cancel = CancellationToken::new(); - let (poll_commander, poll_commanded) = channel(200); + let (poll_commander, poll_commanded) = channel(1000); let mut poller = Poller { subscribers: Default::default(), sink: sinker.clone(), @@ -114,7 +114,7 @@ impl BcConnection { } pub async fn subscribe(&self, msg_id: u32, msg_num: u16) -> Result { - let (tx, rx) = channel(100); + let (tx, rx) = channel(500); self.poll_commander .send(PollCommand::AddSubscriber(msg_id, Some(msg_num), tx)) .await?; @@ -151,7 +151,7 @@ impl BcConnection { /// /// This function creates a temporary handle to grab this single message pub async fn subscribe_to_id(&self, msg_id: u32) -> Result { - let (tx, rx) = channel(100); + let (tx, rx) = channel(500); self.poll_commander .send(PollCommand::AddSubscriber(msg_id, None, tx)) .await?; @@ -310,14 +310,26 @@ impl Poller { }; if let Some(sender) = sender { if sender.capacity() == 0 { - warn!("Reaching limit of channel"); - warn!( - "Remaining: {} of {} message space for {} (ID: {})", - sender.capacity(), - sender.max_capacity(), - &msg_num, - &msg_id - ); + // Channel is full. Use try_send to avoid blocking + // the message loop, which would prevent keepalive + // ping processing and cause camera disconnection. + match sender.try_send(Ok(response)) { + Ok(()) => { + trace!( + "Sent to full channel for {} (ID: {})", + &msg_num, + &msg_id + ); + } + Err(_) => { + warn!( + "Channel full, dropping message for {} (ID: {}), capacity: {}", + &msg_num, + &msg_id, + sender.max_capacity() + ); + } + } } else { trace!( "Remaining: {} of {} message space for {} (ID: {})", @@ -326,8 +338,8 @@ impl Poller { &msg_num, &msg_id ); + let _ = sender.send(Ok(response)).await; } - let _ = sender.send(Ok(response)).await; } else { trace!( "Ignoring uninteresting message id {} (number: {})", diff --git a/src/rtsp/factory.rs b/src/rtsp/factory.rs index 9ba058eed..9f085d9af 100644 --- a/src/rtsp/factory.rs +++ b/src/rtsp/factory.rs @@ -313,13 +313,20 @@ fn send_to_sources( BcMedia::Aac(aac) => { let duration = aac.duration().expect("Could not calculate AAC duration"); if let Some(aud_src) = aud_src.as_ref() { - log::debug!("Sending AAC: {:?}", Duration::from_micros(*aud_ts as u64)); - send_to_appsrc( - aud_src, - aac.data, - Duration::from_micros(*aud_ts as u64), - pools, - )?; + // Drop audio frames when buffer is nearly full to prevent + // cascading backpressure that can stall the video pipeline + let max = aud_src.max_bytes(); + if max > 0 && aud_src.current_level_bytes() >= max * 9 / 10 { + log::debug!("Audio buffer near capacity, dropping AAC frame"); + } else { + log::debug!("Sending AAC: {:?}", Duration::from_micros(*aud_ts as u64)); + send_to_appsrc( + aud_src, + aac.data, + Duration::from_micros(*aud_ts as u64), + pools, + )?; + } } *aud_ts += duration; } @@ -328,13 +335,18 @@ fn send_to_sources( .duration() .expect("Could not calculate ADPCM duration"); if let Some(aud_src) = aud_src.as_ref() { - log::trace!("Sending ADPCM: {:?}", Duration::from_micros(*aud_ts as u64)); - send_to_appsrc( - aud_src, - adpcm.data, - Duration::from_micros(*aud_ts as u64), - pools, - )?; + let max = aud_src.max_bytes(); + if max > 0 && aud_src.current_level_bytes() >= max * 9 / 10 { + log::debug!("Audio buffer near capacity, dropping ADPCM frame"); + } else { + log::trace!("Sending ADPCM: {:?}", Duration::from_micros(*aud_ts as u64)); + send_to_appsrc( + aud_src, + adpcm.data, + Duration::from_micros(*aud_ts as u64), + pools, + )?; + } } *aud_ts += duration; }