Skip to content

Commit 2d8593d

Browse files
committed
adds less efficient microphone variant that is send
`cpal::Stream` is not send on Mac. I need microphone to be send. This 'solves' the non-send stream by parking it on a separate thread.
1 parent e2074c6 commit 2d8593d

File tree

3 files changed

+213
-42
lines changed

3 files changed

+213
-42
lines changed

src/microphone.rs

Lines changed: 51 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -109,14 +109,15 @@ use crate::{Sample, Source};
109109

110110
mod builder;
111111
mod config;
112+
pub mod sendable;
112113
pub use builder::MicrophoneBuilder;
113114
pub use config::InputConfig;
114115
use cpal::I24;
115116
use cpal::{
116117
traits::{DeviceTrait, HostTrait, StreamTrait},
117118
Device,
118119
};
119-
use rtrb::RingBuffer;
120+
use rtrb::{Producer, RingBuffer};
120121

121122
/// Error that can occur when we can not list the input devices
122123
#[derive(Debug, thiserror::Error, Clone)]
@@ -228,10 +229,9 @@ impl Microphone {
228229
config: InputConfig,
229230
mut error_callback: impl FnMut(cpal::StreamError) + Send + 'static,
230231
) -> Result<Self, OpenError> {
231-
let timeout = Some(Duration::from_millis(100));
232232
let hundred_ms_of_samples =
233233
config.channel_count.get() as u32 * config.sample_rate.get() / 10;
234-
let (mut tx, rx) = RingBuffer::new(hundred_ms_of_samples as usize);
234+
let (tx, rx) = RingBuffer::new(hundred_ms_of_samples as usize);
235235
let error_occurred = Arc::new(AtomicBool::new(false));
236236
let error_callback = {
237237
let error_occurred = error_occurred.clone();
@@ -241,43 +241,7 @@ impl Microphone {
241241
}
242242
};
243243

244-
macro_rules! build_input_streams {
245-
($($sample_format:tt, $generic:ty);+) => {
246-
match config.sample_format {
247-
$(
248-
cpal::SampleFormat::$sample_format => device.build_input_stream::<$generic, _, _>(
249-
&config.stream_config(),
250-
move |data, _info| {
251-
for sample in SampleTypeConverter::<_, f32>::new(data.into_iter().copied()) {
252-
let _skip_if_player_is_behind = tx.push(sample);
253-
}
254-
},
255-
error_callback,
256-
timeout,
257-
),
258-
)+
259-
_ => return Err(OpenError::UnsupportedSampleFormat),
260-
}
261-
};
262-
}
263-
264-
let stream = build_input_streams!(
265-
F32, f32;
266-
F64, f64;
267-
I8, i8;
268-
I16, i16;
269-
I24, I24;
270-
I32, i32;
271-
I64, i64;
272-
U8, u8;
273-
U16, u16;
274-
// TODO: uncomment when https://github.com/RustAudio/cpal/pull/1011 is merged
275-
// U24, cpal::U24;
276-
U32, u32;
277-
U64, u64
278-
)
279-
.map_err(OpenError::BuildStream)?;
280-
stream.play().map_err(OpenError::Play)?;
244+
let stream = open_input_stream(device, config, tx, error_callback)?;
281245

282246
Ok(Microphone {
283247
_stream_handle: stream,
@@ -309,3 +273,50 @@ impl Microphone {
309273
&self.config
310274
}
311275
}
276+
277+
fn open_input_stream(
278+
device: Device,
279+
config: InputConfig,
280+
mut tx: Producer<crate::Sample>,
281+
error_callback: impl FnMut(cpal::StreamError) + Send + 'static,
282+
) -> Result<cpal::Stream, OpenError> {
283+
let timeout = Some(Duration::from_millis(100));
284+
285+
macro_rules! build_input_streams {
286+
($($sample_format:tt, $generic:ty);+) => {
287+
match config.sample_format {
288+
$(
289+
cpal::SampleFormat::$sample_format => device.build_input_stream::<$generic, _, _>(
290+
&config.stream_config(),
291+
move |data, _info| {
292+
for sample in SampleTypeConverter::<_, f32>::new(data.into_iter().copied()) {
293+
let _skip_if_player_is_behind = tx.push(sample);
294+
}
295+
},
296+
error_callback,
297+
timeout,
298+
),
299+
)+
300+
_ => return Err(OpenError::UnsupportedSampleFormat),
301+
}
302+
};
303+
}
304+
let stream = build_input_streams!(
305+
F32, f32;
306+
F64, f64;
307+
I8, i8;
308+
I16, i16;
309+
I24, I24;
310+
I32, i32;
311+
I64, i64;
312+
U8, u8;
313+
U16, u16;
314+
// TODO: uncomment when https://github.com/RustAudio/cpal/pull/1011 is merged
315+
// U24, cpal::U24;
316+
U32, u32;
317+
U64, u64
318+
)
319+
.map_err(OpenError::BuildStream)?;
320+
stream.play().map_err(OpenError::Play)?;
321+
Ok(stream)
322+
}

src/microphone/builder.rs

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ use cpal::{
66
};
77

88
use crate::{
9-
common::assert_error_traits, microphone::config::InputConfig, ChannelCount, SampleRate,
9+
common::assert_error_traits,
10+
microphone::{config::InputConfig, sendable},
11+
ChannelCount, SampleRate,
1012
};
1113

1214
use super::Microphone;
@@ -415,7 +417,7 @@ where
415417
/// Sets the buffer size for the input.
416418
///
417419
/// This has no impact on latency, though a too small buffer can lead to audio
418-
/// artifacts if your program can not get samples out of the buffer before they
420+
/// artifacts if your program can not get samples out of the buffer before they
419421
/// get overridden again.
420422
///
421423
/// Normally the default input config will have this set up correctly.
@@ -542,4 +544,25 @@ where
542544
self.error_callback.clone(),
543545
)
544546
}
547+
/// Opens the microphone input stream.
548+
///
549+
/// # Example
550+
/// ```no_run
551+
/// # use rodio::microphone::MicrophoneBuilder;
552+
/// # use rodio::Source;
553+
/// # use std::time::Duration;
554+
/// let mic = MicrophoneBuilder::new()
555+
/// .default_device()?
556+
/// .default_config()?
557+
/// .open_stream()?;
558+
/// let recording = mic.take_duration(Duration::from_secs(3)).record();
559+
/// # Ok::<(), Box<dyn std::error::Error>>(())
560+
/// ```
561+
pub fn open_sendable_stream(&self) -> Result<sendable::Microphone, super::OpenError> {
562+
sendable::Microphone::open(
563+
self.device.as_ref().expect("DeviceIsSet").0.clone(),
564+
*self.config.as_ref().expect("ConfigIsSet"),
565+
self.error_callback.clone(),
566+
)
567+
}
545568
}

src/microphone/sendable.rs

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
//! Slightly less efficient microphone that multiple sources can draw from
2+
//! think of it as an inverse mixer.
3+
4+
use std::{
5+
sync::{
6+
atomic::{AtomicBool, Ordering},
7+
mpsc, Arc,
8+
},
9+
thread::{self, JoinHandle},
10+
time::Duration,
11+
};
12+
13+
use cpal::Device;
14+
use rtrb::RingBuffer;
15+
16+
use crate::{microphone::open_input_stream, Source};
17+
use crate::{
18+
microphone::{InputConfig, OpenError},
19+
Sample,
20+
};
21+
22+
/// Send on all platforms
23+
pub struct Microphone {
24+
_stream_thread: JoinHandle<()>,
25+
buffer: rtrb::Consumer<Sample>,
26+
config: InputConfig,
27+
poll_interval: Duration,
28+
error_occurred: Arc<AtomicBool>,
29+
_drop_tx: mpsc::Sender<()>,
30+
}
31+
32+
impl Microphone {
33+
pub(crate) fn open(
34+
device: Device,
35+
config: InputConfig,
36+
mut error_callback: impl FnMut(cpal::StreamError) + Send + 'static,
37+
) -> Result<Self, OpenError> {
38+
let hundred_ms_of_samples =
39+
config.channel_count.get() as u32 * config.sample_rate.get() / 10;
40+
let (tx, rx) = RingBuffer::new(hundred_ms_of_samples as usize);
41+
let error_occurred = Arc::new(AtomicBool::new(false));
42+
let error_callback = {
43+
let error_occurred = error_occurred.clone();
44+
move |source| {
45+
error_occurred.store(true, Ordering::Relaxed);
46+
error_callback(source);
47+
}
48+
};
49+
50+
let (res_tx, res_rx) = mpsc::channel();
51+
let (_drop_tx, drop_rx) = mpsc::channel::<()>();
52+
let _stream_thread = thread::Builder::new()
53+
.name("Rodio cloneable microphone".to_string())
54+
.spawn(move || {
55+
if let Err(e) = open_input_stream(device, config, tx, error_callback) {
56+
let _ = res_tx.send(Err(e));
57+
} else {
58+
let _ = res_tx.send(Ok(()));
59+
};
60+
61+
let _should_drop = drop_rx.recv();
62+
})
63+
.expect("Should be able to spawn threads");
64+
65+
res_rx
66+
.recv()
67+
.expect("input stream thread should never panic")?;
68+
69+
Ok(Microphone {
70+
_stream_thread,
71+
_drop_tx,
72+
buffer: rx,
73+
config,
74+
poll_interval: Duration::from_millis(5),
75+
error_occurred,
76+
})
77+
}
78+
79+
/// Get the configuration.
80+
///
81+
/// # Example
82+
/// Print the sample rate and channel count.
83+
/// ```no_run
84+
/// # use rodio::microphone::MicrophoneBuilder;
85+
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
86+
/// let mic = MicrophoneBuilder::new()
87+
/// .default_device()?
88+
/// .default_config()?
89+
/// .open_stream()?;
90+
/// let config = mic.config();
91+
/// println!("Sample rate: {}", config.sample_rate.get());
92+
/// println!("Channel count: {}", config.channel_count.get());
93+
/// # Ok(())
94+
/// # }
95+
/// ```
96+
pub fn config(&self) -> &InputConfig {
97+
&self.config
98+
}
99+
}
100+
101+
impl Source for Microphone {
102+
fn current_span_len(&self) -> Option<usize> {
103+
None
104+
}
105+
106+
fn channels(&self) -> crate::ChannelCount {
107+
self.config.channel_count
108+
}
109+
110+
fn sample_rate(&self) -> crate::SampleRate {
111+
self.config.sample_rate
112+
}
113+
114+
fn total_duration(&self) -> Option<std::time::Duration> {
115+
None
116+
}
117+
}
118+
119+
impl Iterator for Microphone {
120+
type Item = f32;
121+
122+
fn next(&mut self) -> Option<Self::Item> {
123+
loop {
124+
if let Ok(sample) = self.buffer.pop() {
125+
return Some(sample);
126+
} else if self.error_occurred.load(Ordering::Relaxed) {
127+
return None;
128+
} else {
129+
thread::sleep(self.poll_interval)
130+
}
131+
}
132+
}
133+
134+
fn size_hint(&self) -> (usize, Option<usize>) {
135+
(self.buffer.slots(), None)
136+
}
137+
}

0 commit comments

Comments
 (0)