Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 25 additions & 13 deletions crates/core/src/bc_protocol/connection/bcconn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ pub struct BcConnection {

impl BcConnection {
pub async fn new(mut sink: BcConnSink, mut source: BcConnSource) -> Result<BcConnection> {
let (sinker, sinker_rx) = channel::<Result<Bc>>(100);
let (sinker, sinker_rx) = channel::<Result<Bc>>(500);
let cancel = CancellationToken::new();

let (poll_commander, poll_commanded) = channel(200);
let (poll_commander, poll_commanded) = channel(1000);
let mut poller = Poller {
subscribers: Default::default(),
sink: sinker.clone(),
Expand Down Expand Up @@ -114,7 +114,7 @@ impl BcConnection {
}

pub async fn subscribe(&self, msg_id: u32, msg_num: u16) -> Result<BcSubscription> {
let (tx, rx) = channel(100);
let (tx, rx) = channel(500);
self.poll_commander
.send(PollCommand::AddSubscriber(msg_id, Some(msg_num), tx))
.await?;
Expand Down Expand Up @@ -151,7 +151,7 @@ impl BcConnection {
///
/// This function creates a temporary handle to grab this single message
pub async fn subscribe_to_id(&self, msg_id: u32) -> Result<BcSubscription> {
let (tx, rx) = channel(100);
let (tx, rx) = channel(500);
self.poll_commander
.send(PollCommand::AddSubscriber(msg_id, None, tx))
.await?;
Expand Down Expand Up @@ -310,14 +310,26 @@ impl Poller {
};
if let Some(sender) = sender {
if sender.capacity() == 0 {
warn!("Reaching limit of channel");
warn!(
"Remaining: {} of {} message space for {} (ID: {})",
sender.capacity(),
sender.max_capacity(),
&msg_num,
&msg_id
);
// Channel is full. Use try_send to avoid blocking
// the message loop, which would prevent keepalive
// ping processing and cause camera disconnection.
match sender.try_send(Ok(response)) {
Ok(()) => {
trace!(
"Sent to full channel for {} (ID: {})",
&msg_num,
&msg_id
);
}
Err(_) => {
warn!(
"Channel full, dropping message for {} (ID: {}), capacity: {}",
&msg_num,
&msg_id,
sender.max_capacity()
);
}
}
} else {
trace!(
"Remaining: {} of {} message space for {} (ID: {})",
Expand All @@ -326,8 +338,8 @@ impl Poller {
&msg_num,
&msg_id
);
let _ = sender.send(Ok(response)).await;
}
let _ = sender.send(Ok(response)).await;
} else {
trace!(
"Ignoring uninteresting message id {} (number: {})",
Expand Down
40 changes: 26 additions & 14 deletions src/rtsp/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,13 +313,20 @@ fn send_to_sources(
BcMedia::Aac(aac) => {
let duration = aac.duration().expect("Could not calculate AAC duration");
if let Some(aud_src) = aud_src.as_ref() {
log::debug!("Sending AAC: {:?}", Duration::from_micros(*aud_ts as u64));
send_to_appsrc(
aud_src,
aac.data,
Duration::from_micros(*aud_ts as u64),
pools,
)?;
// Drop audio frames when buffer is nearly full to prevent
// cascading backpressure that can stall the video pipeline
let max = aud_src.max_bytes();
if max > 0 && aud_src.current_level_bytes() >= max * 9 / 10 {
log::debug!("Audio buffer near capacity, dropping AAC frame");
} else {
log::debug!("Sending AAC: {:?}", Duration::from_micros(*aud_ts as u64));
send_to_appsrc(
aud_src,
aac.data,
Duration::from_micros(*aud_ts as u64),
pools,
)?;
}
}
*aud_ts += duration;
}
Expand All @@ -328,13 +335,18 @@ fn send_to_sources(
.duration()
.expect("Could not calculate ADPCM duration");
if let Some(aud_src) = aud_src.as_ref() {
log::trace!("Sending ADPCM: {:?}", Duration::from_micros(*aud_ts as u64));
send_to_appsrc(
aud_src,
adpcm.data,
Duration::from_micros(*aud_ts as u64),
pools,
)?;
let max = aud_src.max_bytes();
if max > 0 && aud_src.current_level_bytes() >= max * 9 / 10 {
log::debug!("Audio buffer near capacity, dropping ADPCM frame");
} else {
log::trace!("Sending ADPCM: {:?}", Duration::from_micros(*aud_ts as u64));
send_to_appsrc(
aud_src,
adpcm.data,
Duration::from_micros(*aud_ts as u64),
pools,
)?;
}
}
*aud_ts += duration;
}
Expand Down