diff --git a/crates/core/src/bc_protocol/connection/bcconn.rs b/crates/core/src/bc_protocol/connection/bcconn.rs index 06a5d0597..391e7e788 100644 --- a/crates/core/src/bc_protocol/connection/bcconn.rs +++ b/crates/core/src/bc_protocol/connection/bcconn.rs @@ -43,10 +43,10 @@ pub struct BcConnection { impl BcConnection { pub async fn new(mut sink: BcConnSink, mut source: BcConnSource) -> Result { - let (sinker, sinker_rx) = channel::>(100); + let (sinker, sinker_rx) = channel::>(500); let cancel = CancellationToken::new(); - let (poll_commander, poll_commanded) = channel(200); + let (poll_commander, poll_commanded) = channel(1000); let mut poller = Poller { subscribers: Default::default(), sink: sinker.clone(), @@ -114,7 +114,7 @@ impl BcConnection { } pub async fn subscribe(&self, msg_id: u32, msg_num: u16) -> Result { - let (tx, rx) = channel(100); + let (tx, rx) = channel(500); self.poll_commander .send(PollCommand::AddSubscriber(msg_id, Some(msg_num), tx)) .await?; @@ -151,7 +151,7 @@ impl BcConnection { /// /// This function creates a temporary handle to grab this single message pub async fn subscribe_to_id(&self, msg_id: u32) -> Result { - let (tx, rx) = channel(100); + let (tx, rx) = channel(500); self.poll_commander .send(PollCommand::AddSubscriber(msg_id, None, tx)) .await?; @@ -310,14 +310,26 @@ impl Poller { }; if let Some(sender) = sender { if sender.capacity() == 0 { - warn!("Reaching limit of channel"); - warn!( - "Remaining: {} of {} message space for {} (ID: {})", - sender.capacity(), - sender.max_capacity(), - &msg_num, - &msg_id - ); + // Channel is full. Use try_send to avoid blocking + // the message loop, which would prevent keepalive + // ping processing and cause camera disconnection. + match sender.try_send(Ok(response)) { + Ok(()) => { + trace!( + "Sent to full channel for {} (ID: {})", + &msg_num, + &msg_id + ); + } + Err(_) => { + warn!( + "Channel full, dropping message for {} (ID: {}), capacity: {}", + &msg_num, + &msg_id, + sender.max_capacity() + ); + } + } } else { trace!( "Remaining: {} of {} message space for {} (ID: {})", @@ -326,8 +338,8 @@ impl Poller { &msg_num, &msg_id ); + let _ = sender.send(Ok(response)).await; } - let _ = sender.send(Ok(response)).await; } else { trace!( "Ignoring uninteresting message id {} (number: {})", diff --git a/src/rtsp/factory.rs b/src/rtsp/factory.rs index 9ba058eed..9f085d9af 100644 --- a/src/rtsp/factory.rs +++ b/src/rtsp/factory.rs @@ -313,13 +313,20 @@ fn send_to_sources( BcMedia::Aac(aac) => { let duration = aac.duration().expect("Could not calculate AAC duration"); if let Some(aud_src) = aud_src.as_ref() { - log::debug!("Sending AAC: {:?}", Duration::from_micros(*aud_ts as u64)); - send_to_appsrc( - aud_src, - aac.data, - Duration::from_micros(*aud_ts as u64), - pools, - )?; + // Drop audio frames when buffer is nearly full to prevent + // cascading backpressure that can stall the video pipeline + let max = aud_src.max_bytes(); + if max > 0 && aud_src.current_level_bytes() >= max * 9 / 10 { + log::debug!("Audio buffer near capacity, dropping AAC frame"); + } else { + log::debug!("Sending AAC: {:?}", Duration::from_micros(*aud_ts as u64)); + send_to_appsrc( + aud_src, + aac.data, + Duration::from_micros(*aud_ts as u64), + pools, + )?; + } } *aud_ts += duration; } @@ -328,13 +335,18 @@ fn send_to_sources( .duration() .expect("Could not calculate ADPCM duration"); if let Some(aud_src) = aud_src.as_ref() { - log::trace!("Sending ADPCM: {:?}", Duration::from_micros(*aud_ts as u64)); - send_to_appsrc( - aud_src, - adpcm.data, - Duration::from_micros(*aud_ts as u64), - pools, - )?; + let max = aud_src.max_bytes(); + if max > 0 && aud_src.current_level_bytes() >= max * 9 / 10 { + log::debug!("Audio buffer near capacity, dropping ADPCM frame"); + } else { + log::trace!("Sending ADPCM: {:?}", Duration::from_micros(*aud_ts as u64)); + send_to_appsrc( + aud_src, + adpcm.data, + Duration::from_micros(*aud_ts as u64), + pools, + )?; + } } *aud_ts += duration; }