Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 78 additions & 45 deletions src/microphone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,23 +100,24 @@

use core::fmt;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::{thread, time::Duration};
use std::sync::{Arc, Condvar, Mutex};
use std::time::Duration;

use crate::common::assert_error_traits;
use crate::conversions::SampleTypeConverter;
use crate::{Sample, Source};

mod builder;
mod config;
pub mod sendable;
pub use builder::MicrophoneBuilder;
pub use config::InputConfig;
use cpal::I24;
use cpal::{
traits::{DeviceTrait, HostTrait, StreamTrait},
Device,
};
use rtrb::RingBuffer;
use rtrb::{Producer, RingBuffer};

/// Error that can occur when we can not list the input devices
#[derive(Debug, thiserror::Error, Clone)]
Expand Down Expand Up @@ -165,8 +166,8 @@ pub struct Microphone {
_stream_handle: cpal::Stream,
buffer: rtrb::Consumer<Sample>,
config: InputConfig,
poll_interval: Duration,
error_occurred: Arc<AtomicBool>,
data_signal: Arc<(Mutex<()>, Condvar)>,
}

impl Source for Microphone {
Expand Down Expand Up @@ -197,7 +198,11 @@ impl Iterator for Microphone {
} else if self.error_occurred.load(Ordering::Relaxed) {
return None;
} else {
thread::sleep(self.poll_interval)
// Block until notified instead of sleeping. This eliminates polling overhead and
// reduces jitter by avoiding unnecessary wakeups when no audio data is available.
let (lock, cvar) = &*self.data_signal;
let guard = lock.lock().unwrap();
let _guard = cvar.wait(guard).unwrap();
}
}
}
Expand Down Expand Up @@ -228,63 +233,31 @@ impl Microphone {
config: InputConfig,
mut error_callback: impl FnMut(cpal::StreamError) + Send + 'static,
) -> Result<Self, OpenError> {
let timeout = Some(Duration::from_millis(100));
let hundred_ms_of_samples =
config.channel_count.get() as u32 * config.sample_rate.get() / 10;
let (mut tx, rx) = RingBuffer::new(hundred_ms_of_samples as usize);
// rtrb is faster then all other (ring)buffers: https://github.com/mgeier/rtrb/issues/39
let (tx, rx) = RingBuffer::new(hundred_ms_of_samples as usize);
let error_occurred = Arc::new(AtomicBool::new(false));
let data_signal = Arc::new((Mutex::new(()), Condvar::new()));
let error_callback = {
let error_occurred = error_occurred.clone();
let data_signal = data_signal.clone();
move |source| {
error_occurred.store(true, Ordering::Relaxed);
let (_lock, cvar) = &*data_signal;
cvar.notify_one();
error_callback(source);
}
};

macro_rules! build_input_streams {
($($sample_format:tt, $generic:ty);+) => {
match config.sample_format {
$(
cpal::SampleFormat::$sample_format => device.build_input_stream::<$generic, _, _>(
&config.stream_config(),
move |data, _info| {
for sample in SampleTypeConverter::<_, f32>::new(data.into_iter().copied()) {
let _skip_if_player_is_behind = tx.push(sample);
}
},
error_callback,
timeout,
),
)+
_ => return Err(OpenError::UnsupportedSampleFormat),
}
};
}

let stream = build_input_streams!(
F32, f32;
F64, f64;
I8, i8;
I16, i16;
I24, I24;
I32, i32;
I64, i64;
U8, u8;
U16, u16;
// TODO: uncomment when https://github.com/RustAudio/cpal/pull/1011 is merged
// U24, cpal::U24;
U32, u32;
U64, u64
)
.map_err(OpenError::BuildStream)?;
stream.play().map_err(OpenError::Play)?;
let stream = open_input_stream(device, config, tx, error_callback, data_signal.clone())?;

Ok(Microphone {
_stream_handle: stream,
buffer: rx,
config,
poll_interval: Duration::from_millis(5),
error_occurred,
data_signal,
})
}

Expand All @@ -309,3 +282,63 @@ impl Microphone {
&self.config
}
}

fn open_input_stream(
device: Device,
config: InputConfig,
mut tx: Producer<crate::Sample>,
error_callback: impl FnMut(cpal::StreamError) + Send + 'static,
data_signal: Arc<(Mutex<()>, Condvar)>,
) -> Result<cpal::Stream, OpenError> {
let timeout = Some(Duration::from_millis(100));

macro_rules! build_input_streams {
($($sample_format:tt, $generic:ty);+) => {
match config.sample_format {
$(
cpal::SampleFormat::$sample_format => device.build_input_stream::<$generic, _, _>(
&config.stream_config(),
move |data, _info| {
let mut pushed_any = false;
for sample in SampleTypeConverter::<_, f32>::new(data.into_iter().copied()) {
if tx.push(sample).is_ok() {
pushed_any = true;
}
}

// Notify once per CPAL callback if we pushed any samples.
// This avoids complex sample counting that can get stuck when the ring
// buffer is smaller than the CPAL period, or when the buffer is partially
// full. Each callback represents one input period anyway.
if pushed_any {
let (_lock, cvar) = &*data_signal;
cvar.notify_one();
}
},
error_callback,
timeout,
),
)+
_ => return Err(OpenError::UnsupportedSampleFormat),
}
};
}
let stream = build_input_streams!(
F32, f32;
F64, f64;
I8, i8;
I16, i16;
I24, I24;
I32, i32;
I64, i64;
U8, u8;
U16, u16;
// TODO: uncomment when https://github.com/RustAudio/cpal/pull/1011 is merged
// U24, cpal::U24;
U32, u32;
U64, u64
)
.map_err(OpenError::BuildStream)?;
stream.play().map_err(OpenError::Play)?;
Ok(stream)
}
25 changes: 24 additions & 1 deletion src/microphone/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use cpal::{
};

use crate::{
common::assert_error_traits, microphone::config::InputConfig, ChannelCount, SampleRate,
common::assert_error_traits,
microphone::{config::InputConfig, sendable},
ChannelCount, SampleRate,
};

use super::Microphone;
Expand Down Expand Up @@ -542,4 +544,25 @@ where
self.error_callback.clone(),
)
}
/// Opens the microphone input stream.
///
/// # Example
/// ```no_run
/// # use rodio::microphone::MicrophoneBuilder;
/// # use rodio::Source;
/// # use std::time::Duration;
/// let mic = MicrophoneBuilder::new()
/// .default_device()?
/// .default_config()?
/// .open_stream()?;
/// let recording = mic.take_duration(Duration::from_secs(3)).record();
/// # Ok::<(), Box<dyn std::error::Error>>(())
/// ```
pub fn open_sendable_stream(&self) -> Result<sendable::Microphone, super::OpenError> {
sendable::Microphone::open(
self.device.as_ref().expect("DeviceIsSet").0.clone(),
*self.config.as_ref().expect("ConfigIsSet"),
self.error_callback.clone(),
)
}
}
153 changes: 153 additions & 0 deletions src/microphone/sendable.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
//! Slightly less efficient microphone that is Send on all platforms.

// The normal microphone is not send on mac OS as the cpal::stream (handle) is not Send. This
// creates and then keeps that handle on a seperate thread.

use std::{
sync::{
atomic::{AtomicBool, Ordering},
mpsc, Arc, Condvar, Mutex,
},
thread::{self, JoinHandle},
};

use cpal::Device;
use rtrb::RingBuffer;

use crate::{microphone::open_input_stream, Source};
use crate::{
microphone::{InputConfig, OpenError},
Sample,
};

/// Send on all platforms
pub struct Microphone {
_stream_thread: JoinHandle<()>,
buffer: rtrb::Consumer<Sample>,
config: InputConfig,
error_occurred: Arc<AtomicBool>,
data_signal: Arc<(Mutex<()>, Condvar)>,
_drop_tx: mpsc::Sender<()>,
}

impl Microphone {
pub(crate) fn open(
device: Device,
config: InputConfig,
mut error_callback: impl FnMut(cpal::StreamError) + Send + 'static,
) -> Result<Self, OpenError> {
let hundred_ms_of_samples =
config.channel_count.get() as u32 * config.sample_rate.get() / 10;
// Using rtrb (real-time ring buffer) instead of std::sync::mpsc or the ringbuf crate rtrb
// has been benchmarked to be significantly faster in throughput and provides lower latency
// operations.
let (tx, rx) = RingBuffer::new(hundred_ms_of_samples as usize);
let error_occurred = Arc::new(AtomicBool::new(false));
let data_signal = Arc::new((Mutex::new(()), Condvar::new()));
let error_callback = {
let error_occurred = error_occurred.clone();
let data_signal = data_signal.clone();
move |source| {
error_occurred.store(true, Ordering::Relaxed);
let (_lock, cvar) = &*data_signal;
cvar.notify_one();
error_callback(source);
}
};

let (res_tx, res_rx) = mpsc::channel();
let (_drop_tx, drop_rx) = mpsc::channel::<()>();
let data_signal_clone = data_signal.clone();
let _stream_thread = thread::Builder::new()
.name("Rodio sendable microphone".to_string())
.spawn(move || {
match open_input_stream(device, config, tx, error_callback, data_signal_clone) {
Err(e) => {
let _ = res_tx.send(Err(e));
}
Ok(_) => {
let _ = res_tx.send(Ok(()));
// Keep this thread alive until the Microphone struct is dropped
let _should_drop = drop_rx.recv();
}
}
})
.expect("Should be able to spawn threads");

res_rx
.recv()
.expect("input stream thread should never panic")?;

Ok(Microphone {
_stream_thread,
_drop_tx,
buffer: rx,
config,
error_occurred,
data_signal,
})
}

/// Get the configuration.
///
/// # Example
/// Print the sample rate and channel count.
/// ```no_run
/// # use rodio::microphone::MicrophoneBuilder;
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let mic = MicrophoneBuilder::new()
/// .default_device()?
/// .default_config()?
/// .open_stream()?;
/// let config = mic.config();
/// println!("Sample rate: {}", config.sample_rate.get());
/// println!("Channel count: {}", config.channel_count.get());
/// # Ok(())
/// # }
/// ```
pub fn config(&self) -> &InputConfig {
&self.config
}
}

impl Source for Microphone {
fn current_span_len(&self) -> Option<usize> {
None
}

fn channels(&self) -> crate::ChannelCount {
self.config.channel_count
}

fn sample_rate(&self) -> crate::SampleRate {
self.config.sample_rate
}

fn total_duration(&self) -> Option<std::time::Duration> {
None
}
}

impl Iterator for Microphone {
type Item = f32;

fn next(&mut self) -> Option<Self::Item> {
loop {
if let Ok(sample) = self.buffer.pop() {
return Some(sample);
} else if self.error_occurred.load(Ordering::Relaxed) {
return None;
} else {
// Block until notified instead of sleeping. This eliminates polling overhead and
// reduces jitter by avoiding unnecessary wakeups when no audio data is available.
let (lock, cvar) = &*self.data_signal;
let guard = lock.lock().unwrap();
let _guard = cvar.wait(guard).unwrap();
}
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
(self.buffer.slots(), None)
}
}
Loading