diff --git a/src/microphone.rs b/src/microphone.rs index eb98985d..e71ffe59 100644 --- a/src/microphone.rs +++ b/src/microphone.rs @@ -100,8 +100,8 @@ use core::fmt; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; -use std::{thread, time::Duration}; +use std::sync::{Arc, Condvar, Mutex}; +use std::time::Duration; use crate::common::assert_error_traits; use crate::conversions::SampleTypeConverter; @@ -109,6 +109,7 @@ use crate::{Sample, Source}; mod builder; mod config; +pub mod sendable; pub use builder::MicrophoneBuilder; pub use config::InputConfig; use cpal::I24; @@ -116,7 +117,7 @@ use cpal::{ traits::{DeviceTrait, HostTrait, StreamTrait}, Device, }; -use rtrb::RingBuffer; +use rtrb::{Producer, RingBuffer}; /// Error that can occur when we can not list the input devices #[derive(Debug, thiserror::Error, Clone)] @@ -165,8 +166,8 @@ pub struct Microphone { _stream_handle: cpal::Stream, buffer: rtrb::Consumer, config: InputConfig, - poll_interval: Duration, error_occurred: Arc, + data_signal: Arc<(Mutex<()>, Condvar)>, } impl Source for Microphone { @@ -197,7 +198,11 @@ impl Iterator for Microphone { } else if self.error_occurred.load(Ordering::Relaxed) { return None; } else { - thread::sleep(self.poll_interval) + // Block until notified instead of sleeping. This eliminates polling overhead and + // reduces jitter by avoiding unnecessary wakeups when no audio data is available. + let (lock, cvar) = &*self.data_signal; + let guard = lock.lock().unwrap(); + let _guard = cvar.wait(guard).unwrap(); } } } @@ -228,63 +233,31 @@ impl Microphone { config: InputConfig, mut error_callback: impl FnMut(cpal::StreamError) + Send + 'static, ) -> Result { - let timeout = Some(Duration::from_millis(100)); let hundred_ms_of_samples = config.channel_count.get() as u32 * config.sample_rate.get() / 10; - let (mut tx, rx) = RingBuffer::new(hundred_ms_of_samples as usize); + // rtrb is faster then all other (ring)buffers: https://github.com/mgeier/rtrb/issues/39 + let (tx, rx) = RingBuffer::new(hundred_ms_of_samples as usize); let error_occurred = Arc::new(AtomicBool::new(false)); + let data_signal = Arc::new((Mutex::new(()), Condvar::new())); let error_callback = { let error_occurred = error_occurred.clone(); + let data_signal = data_signal.clone(); move |source| { error_occurred.store(true, Ordering::Relaxed); + let (_lock, cvar) = &*data_signal; + cvar.notify_one(); error_callback(source); } }; - macro_rules! build_input_streams { - ($($sample_format:tt, $generic:ty);+) => { - match config.sample_format { - $( - cpal::SampleFormat::$sample_format => device.build_input_stream::<$generic, _, _>( - &config.stream_config(), - move |data, _info| { - for sample in SampleTypeConverter::<_, f32>::new(data.into_iter().copied()) { - let _skip_if_player_is_behind = tx.push(sample); - } - }, - error_callback, - timeout, - ), - )+ - _ => return Err(OpenError::UnsupportedSampleFormat), - } - }; - } - - let stream = build_input_streams!( - F32, f32; - F64, f64; - I8, i8; - I16, i16; - I24, I24; - I32, i32; - I64, i64; - U8, u8; - U16, u16; - // TODO: uncomment when https://github.com/RustAudio/cpal/pull/1011 is merged - // U24, cpal::U24; - U32, u32; - U64, u64 - ) - .map_err(OpenError::BuildStream)?; - stream.play().map_err(OpenError::Play)?; + let stream = open_input_stream(device, config, tx, error_callback, data_signal.clone())?; Ok(Microphone { _stream_handle: stream, buffer: rx, config, - poll_interval: Duration::from_millis(5), error_occurred, + data_signal, }) } @@ -309,3 +282,63 @@ impl Microphone { &self.config } } + +fn open_input_stream( + device: Device, + config: InputConfig, + mut tx: Producer, + error_callback: impl FnMut(cpal::StreamError) + Send + 'static, + data_signal: Arc<(Mutex<()>, Condvar)>, +) -> Result { + let timeout = Some(Duration::from_millis(100)); + + macro_rules! build_input_streams { + ($($sample_format:tt, $generic:ty);+) => { + match config.sample_format { + $( + cpal::SampleFormat::$sample_format => device.build_input_stream::<$generic, _, _>( + &config.stream_config(), + move |data, _info| { + let mut pushed_any = false; + for sample in SampleTypeConverter::<_, f32>::new(data.into_iter().copied()) { + if tx.push(sample).is_ok() { + pushed_any = true; + } + } + + // Notify once per CPAL callback if we pushed any samples. + // This avoids complex sample counting that can get stuck when the ring + // buffer is smaller than the CPAL period, or when the buffer is partially + // full. Each callback represents one input period anyway. + if pushed_any { + let (_lock, cvar) = &*data_signal; + cvar.notify_one(); + } + }, + error_callback, + timeout, + ), + )+ + _ => return Err(OpenError::UnsupportedSampleFormat), + } + }; + } + let stream = build_input_streams!( + F32, f32; + F64, f64; + I8, i8; + I16, i16; + I24, I24; + I32, i32; + I64, i64; + U8, u8; + U16, u16; + // TODO: uncomment when https://github.com/RustAudio/cpal/pull/1011 is merged + // U24, cpal::U24; + U32, u32; + U64, u64 + ) + .map_err(OpenError::BuildStream)?; + stream.play().map_err(OpenError::Play)?; + Ok(stream) +} diff --git a/src/microphone/builder.rs b/src/microphone/builder.rs index deac72da..1cf8c6dd 100644 --- a/src/microphone/builder.rs +++ b/src/microphone/builder.rs @@ -6,7 +6,9 @@ use cpal::{ }; use crate::{ - common::assert_error_traits, microphone::config::InputConfig, ChannelCount, SampleRate, + common::assert_error_traits, + microphone::{config::InputConfig, sendable}, + ChannelCount, SampleRate, }; use super::Microphone; @@ -542,4 +544,25 @@ where self.error_callback.clone(), ) } + /// Opens the microphone input stream. + /// + /// # Example + /// ```no_run + /// # use rodio::microphone::MicrophoneBuilder; + /// # use rodio::Source; + /// # use std::time::Duration; + /// let mic = MicrophoneBuilder::new() + /// .default_device()? + /// .default_config()? + /// .open_stream()?; + /// let recording = mic.take_duration(Duration::from_secs(3)).record(); + /// # Ok::<(), Box>(()) + /// ``` + pub fn open_sendable_stream(&self) -> Result { + sendable::Microphone::open( + self.device.as_ref().expect("DeviceIsSet").0.clone(), + *self.config.as_ref().expect("ConfigIsSet"), + self.error_callback.clone(), + ) + } } diff --git a/src/microphone/sendable.rs b/src/microphone/sendable.rs new file mode 100644 index 00000000..34bb06e6 --- /dev/null +++ b/src/microphone/sendable.rs @@ -0,0 +1,153 @@ +//! Slightly less efficient microphone that is Send on all platforms. + +// The normal microphone is not send on mac OS as the cpal::stream (handle) is not Send. This +// creates and then keeps that handle on a seperate thread. + +use std::{ + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc, Arc, Condvar, Mutex, + }, + thread::{self, JoinHandle}, +}; + +use cpal::Device; +use rtrb::RingBuffer; + +use crate::{microphone::open_input_stream, Source}; +use crate::{ + microphone::{InputConfig, OpenError}, + Sample, +}; + +/// Send on all platforms +pub struct Microphone { + _stream_thread: JoinHandle<()>, + buffer: rtrb::Consumer, + config: InputConfig, + error_occurred: Arc, + data_signal: Arc<(Mutex<()>, Condvar)>, + _drop_tx: mpsc::Sender<()>, +} + +impl Microphone { + pub(crate) fn open( + device: Device, + config: InputConfig, + mut error_callback: impl FnMut(cpal::StreamError) + Send + 'static, + ) -> Result { + let hundred_ms_of_samples = + config.channel_count.get() as u32 * config.sample_rate.get() / 10; + // Using rtrb (real-time ring buffer) instead of std::sync::mpsc or the ringbuf crate rtrb + // has been benchmarked to be significantly faster in throughput and provides lower latency + // operations. + let (tx, rx) = RingBuffer::new(hundred_ms_of_samples as usize); + let error_occurred = Arc::new(AtomicBool::new(false)); + let data_signal = Arc::new((Mutex::new(()), Condvar::new())); + let error_callback = { + let error_occurred = error_occurred.clone(); + let data_signal = data_signal.clone(); + move |source| { + error_occurred.store(true, Ordering::Relaxed); + let (_lock, cvar) = &*data_signal; + cvar.notify_one(); + error_callback(source); + } + }; + + let (res_tx, res_rx) = mpsc::channel(); + let (_drop_tx, drop_rx) = mpsc::channel::<()>(); + let data_signal_clone = data_signal.clone(); + let _stream_thread = thread::Builder::new() + .name("Rodio sendable microphone".to_string()) + .spawn(move || { + match open_input_stream(device, config, tx, error_callback, data_signal_clone) { + Err(e) => { + let _ = res_tx.send(Err(e)); + } + Ok(_) => { + let _ = res_tx.send(Ok(())); + // Keep this thread alive until the Microphone struct is dropped + let _should_drop = drop_rx.recv(); + } + } + }) + .expect("Should be able to spawn threads"); + + res_rx + .recv() + .expect("input stream thread should never panic")?; + + Ok(Microphone { + _stream_thread, + _drop_tx, + buffer: rx, + config, + error_occurred, + data_signal, + }) + } + + /// Get the configuration. + /// + /// # Example + /// Print the sample rate and channel count. + /// ```no_run + /// # use rodio::microphone::MicrophoneBuilder; + /// # fn main() -> Result<(), Box> { + /// let mic = MicrophoneBuilder::new() + /// .default_device()? + /// .default_config()? + /// .open_stream()?; + /// let config = mic.config(); + /// println!("Sample rate: {}", config.sample_rate.get()); + /// println!("Channel count: {}", config.channel_count.get()); + /// # Ok(()) + /// # } + /// ``` + pub fn config(&self) -> &InputConfig { + &self.config + } +} + +impl Source for Microphone { + fn current_span_len(&self) -> Option { + None + } + + fn channels(&self) -> crate::ChannelCount { + self.config.channel_count + } + + fn sample_rate(&self) -> crate::SampleRate { + self.config.sample_rate + } + + fn total_duration(&self) -> Option { + None + } +} + +impl Iterator for Microphone { + type Item = f32; + + fn next(&mut self) -> Option { + loop { + if let Ok(sample) = self.buffer.pop() { + return Some(sample); + } else if self.error_occurred.load(Ordering::Relaxed) { + return None; + } else { + // Block until notified instead of sleeping. This eliminates polling overhead and + // reduces jitter by avoiding unnecessary wakeups when no audio data is available. + let (lock, cvar) = &*self.data_signal; + let guard = lock.lock().unwrap(); + let _guard = cvar.wait(guard).unwrap(); + } + } + } + + fn size_hint(&self) -> (usize, Option) { + (self.buffer.slots(), None) + } +}