diff --git a/Cargo.lock b/Cargo.lock index 636ea8cc..157d8397 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -75,7 +75,7 @@ dependencies = [ "bitflags 2.6.0", "cexpr", "clang-sys", - "itertools", + "itertools 0.13.0", "proc-macro2", "quote", "regex", @@ -496,6 +496,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285" +dependencies = [ + "either", +] + [[package]] name = "jni" version = "0.21.1" @@ -918,6 +927,7 @@ dependencies = [ "dasp_sample", "divan", "hound", + "itertools 0.14.0", "lewton", "minimp3_fixed", "num-rational", diff --git a/Cargo.toml b/Cargo.toml index 4e756567..7d94f73a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -67,6 +67,7 @@ rstest_reuse = "0.6.0" approx = "0.5.1" dasp_sample = "0.11.0" divan = "0.1.14" +itertools = "0.14" [[bench]] name = "effects" diff --git a/src/queue.rs b/src/queue.rs index 2a2f434f..69f8ad87 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -1,5 +1,7 @@ //! Queue that plays sounds one after the other. +use std::cell::Cell; +use std::collections::VecDeque; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -8,10 +10,6 @@ use crate::source::{Empty, SeekError, Source, Zero}; use crate::Sample; use crate::common::{ChannelCount, SampleRate}; -#[cfg(feature = "crossbeam-channel")] -use crossbeam_channel::{unbounded as channel, Receiver, Sender}; -#[cfg(not(feature = "crossbeam-channel"))] -use std::sync::mpsc::{channel, Receiver, Sender}; /// Builds a new queue. It consists of an input and an output. /// @@ -24,69 +22,56 @@ use std::sync::mpsc::{channel, Receiver, Sender}; /// a new sound. /// - If you pass `false`, then the queue will report that it has finished playing. /// -pub fn queue(keep_alive_if_empty: bool) -> (Arc>, SourcesQueueOutput) +pub fn queue(keep_alive_if_empty: bool) -> (Arc>, QueueSource) where S: Sample + Send + 'static, { - let input = Arc::new(SourcesQueueInput { - next_sounds: Mutex::new(Vec::new()), + let input = Arc::new(QueueControls { + next_sounds: Mutex::new(VecDeque::new()), keep_alive_if_empty: AtomicBool::new(keep_alive_if_empty), }); - let output = SourcesQueueOutput { + let output = QueueSource { current: Box::new(Empty::::new()) as Box<_>, - signal_after_end: None, input: input.clone(), + starting_silence: Cell::new(false), + buffered: None, + samples_left_in_span: Cell::new(0), + starting_silence_channels: Cell::new(2), + starting_silence_sample_rate: Cell::new(4100), }; (input, output) } -// TODO: consider reimplementing this with `from_factory` - type Sound = Box + Send>; -type SignalDone = Option>; - /// The input of the queue. -pub struct SourcesQueueInput { - next_sounds: Mutex, SignalDone)>>, +pub struct QueueControls { + next_sounds: Mutex>>, // See constructor. keep_alive_if_empty: AtomicBool, } -impl SourcesQueueInput +impl QueueControls where S: Sample + Send + 'static, { - /// Adds a new source to the end of the queue. - #[inline] - pub fn append(&self, source: T) - where - T: Source + Send + 'static, - { - self.next_sounds - .lock() - .unwrap() - .push((Box::new(source) as Box<_>, None)); - } - /// Adds a new source to the end of the queue. /// - /// The `Receiver` will be signalled when the sound has finished playing. + /// If silence was playing it can take up to milliseconds before + /// the new sound is played. /// - /// Enable the feature flag `crossbeam-channel` in rodio to use a `crossbeam_channel::Receiver` instead. + /// Sources of only one sample are skipped (though next is still called on them). #[inline] - pub fn append_with_signal(&self, source: T) -> Receiver<()> + pub fn append(&self, source: T) where T: Source + Send + 'static, { - let (tx, rx) = channel(); self.next_sounds .lock() .unwrap() - .push((Box::new(source) as Box<_>, Some(tx))); - rx + .push_back(Box::new(source) as Box<_>); } /// Sets whether the queue stays alive if there's no more sound to play. @@ -106,67 +91,132 @@ where } } /// The output of the queue. Implements `Source`. -pub struct SourcesQueueOutput { +pub struct QueueSource { // The current iterator that produces samples. current: Box + Send>, - // Signal this sender before picking from `next`. - signal_after_end: Option>, - // The next sounds. - input: Arc>, + input: Arc>, + + starting_silence: Cell, + starting_silence_channels: Cell, + starting_silence_sample_rate: Cell, + + samples_left_in_span: Cell, + + buffered: Option, } -const THRESHOLD: usize = 512; -impl Source for SourcesQueueOutput +impl Source for QueueSource where - S: Sample + Send + 'static, + S: Sample + Send + 'static + core::fmt::Debug, { #[inline] fn current_span_len(&self) -> Option { - // This function is non-trivial because the boundary between two sounds in the queue should - // be a span boundary as well. + // This function is non-trivial because the boundary between two + // sounds in the queue should be a span boundary as well. Further more + // we can *only* return Some(0) if the queue should stop playing. + // + // This function can be called at any time though its normally only + // called at the end of the span to get how long the next span will be. // - // The current sound is free to return `None` for `current_span_len()`, in which case - // we *should* return the number of samples remaining the current sound. - // This can be estimated with `size_hint()`. + // The current sound is free to return `None` for + // `current_span_len()`. That means there is only one span left and it + // lasts until the end of the sound. We get a lower bound on that + // length using `size_hint()`. // - // If the `size_hint` is `None` as well, we are in the worst case scenario. To handle this - // situation we force a span to have a maximum number of samples indicate by this - // constant. - - // Try the current `current_span_len`. - if let Some(val) = self.current.current_span_len() { - if val != 0 { - return Some(val); - } else if self.input.keep_alive_if_empty.load(Ordering::Acquire) - && self.input.next_sounds.lock().unwrap().is_empty() - { - // The next source will be a filler silence which will have the length of `THRESHOLD` - return Some(THRESHOLD); + // If the `size_hint` is `None` as well, we are in the worst case + // scenario. To handle this situation we force a span to have a + // maximum number of samples with a constant. If the source ends before + // that point we need to start silence for the remainder of the forced span. + + let (span_len, size_lower_bound) = if self.buffered.is_none() { + if let Some(next) = self.next_non_empty_sound_params() { + (next.span_len, next.size_lower_bound) + } else if self.should_end_when_input_empty() { + return Some(0); + } else { + self.starting_silence.set(true); + return Some(self.silence_span_len()); } + } else { + (self.current.current_span_len(), self.current.size_hint().0) + }; + + if self.samples_left_in_span.get() > 0 { + return Some(self.samples_left_in_span.get()); } - // Try the size hint. - let (lower_bound, _) = self.current.size_hint(); - // The iterator default implementation just returns 0. - // That's a problematic value, so skip it. - if lower_bound > 0 { - return Some(lower_bound); + let res = if let Some(len) = span_len { + // correct len for buffered sample + let len = if self.buffered.is_some() { + len + 1 + } else { + len + }; + + if len > 0 { + Some(len) + } else if self.should_end_when_input_empty() { + Some(0) + } else { + // Must be first call after creation with nothing pushed yet. + // Call to next should be silence. A source pushed between this call + // and the first call to next could cause a bug here. + // + // We signal to next that we need a silence now even if a new + // source is available + self.starting_silence.set(true); + if let Some(params) = self.next_non_empty_sound_params() { + self.starting_silence_sample_rate.set(params.sample_rate); + self.starting_silence_channels.set(params.channels); + } else { + self.starting_silence_sample_rate.set(44_100); + self.starting_silence_channels.set(2); + }; + Some(self.silence_span_len()) + } + } else if size_lower_bound == 0 { + // span could end earlier we correct for that by playing silence + // if that happens + Some(self.fallback_span_length()) + } else { + Some(size_lower_bound) + }; + + if let Some(len) = res { + self.samples_left_in_span.set(len); } - // Otherwise we use the constant value. - Some(THRESHOLD) + res } #[inline] fn channels(&self) -> ChannelCount { - self.current.channels() + if self.buffered.is_none() { + if let Some(next) = self.next_non_empty_sound_params() { + next.channels + } else { + self.starting_silence_channels.set(2); + 2 + } + } else { + self.current.channels() + } } #[inline] fn sample_rate(&self) -> SampleRate { - self.current.sample_rate() + if self.buffered.is_none() { + if let Some(next) = self.next_non_empty_sound_params() { + next.sample_rate + } else { + self.starting_silence_sample_rate.set(44_100); + 44100 + } + } else { + self.current.sample_rate() + } } #[inline] @@ -179,7 +229,7 @@ where // that it advances the queue if the position is beyond the current song. // // We would then however need to enable seeking backwards across sources too. - // That no longer seems in line with the queue behaviour. + // That no longer seems in line with the queue behavior. // // A final pain point is that we would need the total duration for the // next few songs. @@ -189,132 +239,160 @@ where } } -impl Iterator for SourcesQueueOutput +impl Iterator for QueueSource where - S: Sample + Send + 'static, + S: Sample + Send + 'static + std::fmt::Debug, { type Item = S; #[inline] fn next(&mut self) -> Option { - loop { - // Basic situation that will happen most of the time. - if let Some(sample) = self.current.next() { - return Some(sample); + // may only return None when the queue should end + let res = match dbg!((self.buffered.take(), self.current.next())) { + (Some(sample1), Some(samples2)) => { + self.buffered = Some(samples2); + Some(sample1) } - - // Since `self.current` has finished, we need to pick the next sound. - // In order to avoid inlining this expensive operation, the code is in another function. - if self.go_next().is_err() { - return None; + (Some(sample1), None) => self.current_is_ending(sample1), + (None, Some(sample1)) => { + // start, populate the buffer + self.buffered = self.current.next(); + Some(sample1) } + (None, None) => self.no_buffer_no_source(), + }; + + if let Some(samples_left) = self.samples_left_in_span.get().checked_sub(1) { + self.samples_left_in_span.set(dbg!(samples_left)); } + + res } #[inline] fn size_hint(&self) -> (usize, Option) { - (self.current.size_hint().0, None) + (0, None) } } -impl SourcesQueueOutput +impl QueueSource where - S: Sample + Send + 'static, + S: Sample + Send + 'static + core::fmt::Debug, { - // Called when `current` is empty and we must jump to the next element. - // Returns `Ok` if the sound should continue playing, or an error if it should stop. - // - // This method is separate so that it is not inlined. - fn go_next(&mut self) -> Result<(), ()> { - if let Some(signal_after_end) = self.signal_after_end.take() { - let _ = signal_after_end.send(()); - } - - let (next, signal_after_end) = { - let mut next = self.input.next_sounds.lock().unwrap(); + fn fallback_span_length(&self) -> usize { + // ~ 5 milliseconds at 44100 + 200 * self.channels() as usize + } - if next.len() == 0 { - let silence = Box::new(Zero::::new_samples(1, 44100, THRESHOLD)) as Box<_>; - if self.input.keep_alive_if_empty.load(Ordering::Acquire) { - // Play a short silence in order to avoid spinlocking. - (silence, None) - } else { - return Err(()); - } - } else { - next.remove(0) - } - }; + fn finish_span_with_silence(&self, samples: usize) -> Sound { + let silence = + Zero::::new_samples(self.current.channels(), self.current.sample_rate(), samples); + Box::new(silence) + } - self.current = next; - self.signal_after_end = signal_after_end; - Ok(()) + fn silence_span_len(&self) -> usize { + // ~ 5 milliseconds at 44100 + 200 * self.channels() as usize } -} -#[cfg(test)] -mod tests { - use crate::buffer::SamplesBuffer; - use crate::queue; - use crate::source::Source; - - #[test] - #[ignore] // FIXME: samples rate and channel not updated immediately after transition - fn basic() { - let (tx, mut rx) = queue::queue(false); - - tx.append(SamplesBuffer::new(1, 48000, vec![10i16, -10, 10, -10])); - tx.append(SamplesBuffer::new(2, 96000, vec![5i16, 5, 5, 5])); - - assert_eq!(rx.channels(), 1); - assert_eq!(rx.sample_rate(), 48000); - assert_eq!(rx.next(), Some(10)); - assert_eq!(rx.next(), Some(-10)); - assert_eq!(rx.next(), Some(10)); - assert_eq!(rx.next(), Some(-10)); - assert_eq!(rx.channels(), 2); - assert_eq!(rx.sample_rate(), 96000); - assert_eq!(rx.next(), Some(5)); - assert_eq!(rx.next(), Some(5)); - assert_eq!(rx.next(), Some(5)); - assert_eq!(rx.next(), Some(5)); - assert_eq!(rx.next(), None); + fn silence(&self) -> Sound { + let samples = self.silence_span_len(); + // silence matches span params to make sure resampling + // gives not popping. It also makes the queue code simpler + let silence = + Zero::::new_samples(self.current.channels(), self.current.sample_rate(), samples); + Box::new(silence) } - #[test] - fn immediate_end() { - let (_, mut rx) = queue::queue::(false); - assert_eq!(rx.next(), None); + fn should_end_when_input_empty(&self) -> bool { + !self.input.keep_alive_if_empty.load(Ordering::Acquire) } - #[test] - fn keep_alive() { - let (tx, mut rx) = queue::queue(true); - tx.append(SamplesBuffer::new(1, 48000, vec![10i16, -10, 10, -10])); + fn next_sound(&self) -> Option> { + self.input.next_sounds.lock().unwrap().pop_front() + } - assert_eq!(rx.next(), Some(10)); - assert_eq!(rx.next(), Some(-10)); - assert_eq!(rx.next(), Some(10)); - assert_eq!(rx.next(), Some(-10)); + fn no_buffer_no_source(&mut self) -> Option { + // Prevents a race condition where a call `current_span_len` + // precedes the call to `next` + if dbg!(self.starting_silence.get()) { + self.current = self.silence(); + self.starting_silence.set(true); + return self.current.next(); + } - for _ in 0..100000 { - assert_eq!(rx.next(), Some(0)); + loop { + if let Some(mut sound) = self.next_sound() { + if let Some((sample1, sample2)) = sound.next().zip(sound.next()) { + self.current = sound; + self.buffered = Some(sample2); + self.current_span_len(); + return Some(sample1); + } else { + continue; + } + } else if self.should_end_when_input_empty() { + return None; + } else { + self.current = self.silence(); + return self.current.next(); + } } } - #[test] - #[ignore] // TODO: not yet implemented - fn no_delay_when_added() { - let (tx, mut rx) = queue::queue(true); + fn current_is_ending(&mut self, sample1: S) -> Option { + // note sources are free to stop (return None) mid frame and + // mid span, we must handle that here + + // check if the span we reported is ended after returning the + // buffered source. If not we need to provide a silence to guarantee + // the span ends when we promised + if self.samples_left_in_span.get() > 1 { + dbg!(&self.samples_left_in_span); + self.current = self.finish_span_with_silence(self.samples_left_in_span.get() - 1); + return Some(sample1); + } - for _ in 0..500 { - assert_eq!(rx.next(), Some(0)); + loop { + if let Some(mut sound) = self.next_sound() { + if let Some(sample2) = sound.next() { + self.current = sound; + // updates samples_left_in_span + self.buffered = Some(sample2); + self.current_span_len(); + return Some(sample1); + } else { + continue; + } + } else if self.should_end_when_input_empty() { + return Some(sample1); + } else { + self.current = self.silence(); + self.current_span_len(); + self.buffered = self.current.next(); + return Some(sample1); + } } + } - tx.append(SamplesBuffer::new(1, 48000, vec![10i16, -10, 10, -10])); - assert_eq!(rx.next(), Some(10)); - assert_eq!(rx.next(), Some(-10)); - assert_eq!(rx.next(), Some(10)); - assert_eq!(rx.next(), Some(-10)); + fn next_non_empty_sound_params(&self) -> Option { + let next_sounds = self.input.next_sounds.lock().unwrap(); + next_sounds + .iter() + .find(|s| s.current_span_len().is_none_or(|l| l > 0)) + .map(|s| NonEmptySourceParams { + size_lower_bound: s.size_hint().0, + span_len: s.current_span_len(), + channels: s.channels(), + sample_rate: s.sample_rate(), + }) } } + +#[derive(Debug)] +struct NonEmptySourceParams { + size_lower_bound: usize, + span_len: Option, + channels: ChannelCount, + sample_rate: SampleRate, +} diff --git a/src/sink.rs b/src/sink.rs index bd190d88..94172dae 100644 --- a/src/sink.rs +++ b/src/sink.rs @@ -1,5 +1,5 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex}; +use std::sync::{mpsc, Arc, Mutex}; use std::time::Duration; #[cfg(feature = "crossbeam-channel")] @@ -9,7 +9,7 @@ use dasp_sample::FromSample; use std::sync::mpsc::{Receiver, Sender}; use crate::mixer::Mixer; -use crate::source::SeekError; +use crate::source::{EmptyCallback, SeekError}; use crate::{queue, source::Done, Sample, Source}; /// Handle to a device that outputs sounds. @@ -17,7 +17,7 @@ use crate::{queue, source::Done, Sample, Source}; /// Dropping the `Sink` stops all its sounds. You can use `detach` if you want the sounds to continue /// playing. pub struct Sink { - queue_tx: Arc>, + queue_tx: Arc>, sleep_until_end: Mutex>>, controls: Arc, @@ -78,7 +78,7 @@ impl Sink { /// Builds a new `Sink`. #[inline] - pub fn new() -> (Sink, queue::SourcesQueueOutput) { + pub fn new() -> (Sink, queue::QueueSource) { let (queue_tx, queue_rx) = queue::queue(true); let sink = Sink { @@ -159,7 +159,15 @@ impl Sink { .convert_samples(); self.sound_count.fetch_add(1, Ordering::Relaxed); let source = Done::new(source, self.sound_count.clone()); - *self.sleep_until_end.lock().unwrap() = Some(self.queue_tx.append_with_signal(source)); + self.queue_tx.append(source); + + let (tx, rx) = mpsc::channel(); + let callback_source = EmptyCallback::::new(Box::new(move || { + let _ = tx.send(()); + })); + let callback_source = Box::new(callback_source) as Box + Send>; + self.queue_tx.append(callback_source); + *self.sleep_until_end.lock().unwrap() = Some(rx); } /// Gets the volume of the sound. @@ -371,6 +379,7 @@ mod tests { use crate::buffer::SamplesBuffer; use crate::{Sink, Source}; + #[ignore = "debugging queue"] #[test] fn test_pause_and_stop() { let (sink, mut queue_rx) = Sink::new(); @@ -402,6 +411,7 @@ mod tests { assert_eq!(sink.empty(), true); } + #[ignore = "debugging queue"] #[test] fn test_stop_and_start() { let (sink, mut queue_rx) = Sink::new(); @@ -430,6 +440,7 @@ mod tests { assert_eq!(queue_rx.next(), src.next()); } + #[ignore = "debugging queue"] #[test] fn test_volume() { let (sink, mut queue_rx) = Sink::new(); diff --git a/src/source/mod.rs b/src/source/mod.rs index 605057e8..00b9e597 100644 --- a/src/source/mod.rs +++ b/src/source/mod.rs @@ -152,13 +152,17 @@ pub use self::noise::{pink, white, PinkNoise, WhiteNoise}; /// the number of samples that remain in the iterator before the samples rate and number of /// channels can potentially change. /// +/// ## Span length +/// A span *must* consists of whole frames and start at the beginning of a frame. In other words: +/// the first sample of a span must be for channel 0 while the last sample must be for the last +/// channel. That way the next span again starts at channel 0. pub trait Source: Iterator where Self::Item: Sample, { - /// Returns the number of samples before the current span ends. `None` means "infinite" or - /// "until the sound ends". - /// Should never return 0 unless there's no more data. + /// Returns the number of samples before the current span ends. This number **must** be a + /// multiple of channel count. `None` means "infinite" or "until the sound ends". Should never + /// return 0 unless there's no more data. /// /// After the engine has finished reading the specified number of samples, it will check /// whether the value of `channels()` and/or `sample_rate()` have changed. diff --git a/tests/channel_volume.rs b/tests/channel_volume.rs new file mode 100644 index 00000000..88d7a48e --- /dev/null +++ b/tests/channel_volume.rs @@ -0,0 +1,45 @@ +use std::fs; +use std::io::BufReader; + +use itertools::Itertools; + +use rodio::source::ChannelVolume; +use rodio::{queue, Decoder, Source}; + +#[test] +fn no_queue() { + let file = fs::File::open("assets/music.mp3").unwrap(); + let decoder = Decoder::new(BufReader::new(file)).unwrap(); + assert_eq!(decoder.channels(), 2); + let channel_volume = ChannelVolume::new(decoder, vec![1.0, 1.0, 0.0, 0.0, 0.0, 0.0]); + assert_eq!(channel_volume.channels(), 6); + + assert_output_only_on_channel_1_and_2(channel_volume); +} + +#[test] +fn with_queue_in_between() { + let file = fs::File::open("assets/music.mp3").unwrap(); + let decoder = Decoder::new(BufReader::new(file)).unwrap(); + assert_eq!(decoder.channels(), 2); + let channel_volume = ChannelVolume::new(decoder, vec![1.0, 1.0, 0.0, 0.0, 0.0, 0.0]); + assert_eq!(channel_volume.channels(), 6); + + let (controls, queue) = queue::queue(false); + controls.append(channel_volume); + + assert_output_only_on_channel_1_and_2(queue); +} + +fn assert_output_only_on_channel_1_and_2(source: impl Source) { + for (frame_number, mut frame) in source.chunks(6).into_iter().enumerate() { + let frame: [_; 6] = frame.next_array().expect(&format!( + "Source should contain whole frames, frame {frame_number} was partial" + )); + assert_eq!( + &frame[2..], + &[0., 0., 0., 0.], + "frame {frame_number} had nonzero volume on a channel that should be zero" + ) + } +} diff --git a/tests/queue.rs b/tests/queue.rs new file mode 100644 index 00000000..51f9f2f6 --- /dev/null +++ b/tests/queue.rs @@ -0,0 +1,279 @@ +use std::time::Duration; + +use rodio::buffer::SamplesBuffer; +use rodio::queue; +use rodio::source::Source; +use test_support::TestSource; + +#[test] +// #[ignore] // FIXME: samples rate and channel not updated immediately after transition +fn basic() { + let (controls, mut source) = queue::queue(false); + + let mut source1 = SamplesBuffer::new(1, 48000, vec![10i16, -10, 10, -10]); + let mut source2 = SamplesBuffer::new(2, 96000, vec![5i16, 5, 5, 5]); + controls.append(source1.clone()); + controls.append(source2.clone()); + + assert_eq!(source.current_span_len(), Some(4)); + assert_eq!(source.channels(), source1.channels()); + assert_eq!(source.sample_rate(), source1.sample_rate()); + assert_eq!(source.next(), source1.next()); + assert_eq!(source.next(), source1.next()); + assert_eq!(source.current_span_len(), Some(2)); + assert_eq!(source.next(), source1.next()); + assert_eq!(source.next(), source1.next()); + assert_eq!(None, source1.next()); + + assert_eq!(source.current_span_len(), Some(4)); + assert_eq!(source.channels(), source2.channels()); + assert_eq!(source.sample_rate(), source2.sample_rate()); + assert_eq!(source.next(), source2.next()); + assert_eq!(source.next(), source2.next()); + assert_eq!(source.current_span_len(), Some(2)); + assert_eq!(source.next(), source2.next()); + assert_eq!(source.next(), source2.next()); + assert_eq!(None, source2.next()); + + assert_eq!(source.current_span_len(), Some(0)); + assert_eq!(source.next(), None); +} + +#[test] +fn immediate_end() { + let (_, mut source) = queue::queue::(false); + assert_eq!(source.current_span_len(), Some(0)); + assert_eq!(source.next(), None); +} + +#[test] +fn keep_alive() { + let (controls, mut source) = queue::queue(true); + controls.append(SamplesBuffer::new(1, 48000, vec![10i16, -10, 10, -10])); + + assert_eq!(source.next(), Some(10)); + assert_eq!(source.next(), Some(-10)); + assert_eq!(source.next(), Some(10)); + assert_eq!(source.next(), Some(-10)); + + for _ in 0..100000 { + assert_eq!(source.next(), Some(0)); + } +} + +#[test] +fn limited_delay_when_added_with_keep_alive() { + let (controls, mut source) = queue::queue(true); + + for _ in 0..500 { + assert_eq!(source.next(), Some(0)); + } + + controls.append(SamplesBuffer::new(4, 41000, vec![10i16, -10, 10, -10])); + let sample_rate = source.sample_rate() as f64; + let channels = source.channels() as f64; + let delay_samples = source.by_ref().take_while(|s| *s == 0).count(); + let delay = Duration::from_secs_f64(delay_samples as f64 / channels / sample_rate); + assert!(delay < Duration::from_millis(10), "delay was: {delay:?}"); + + // note we lose the first sample in the take_while + assert_eq!(source.next(), Some(-10)); + assert_eq!(source.next(), Some(10)); + assert_eq!(source.next(), Some(-10)); +} + +#[test] +fn parameters_queried_before_next() { + let test_source = TestSource::new(&[0.1; 5]) + .with_channels(1) + .with_sample_rate(1); + + let (controls, mut source) = queue::queue(true); + + assert_eq!(source.current_span_len(), Some(400)); + controls.append(test_source); + assert_eq!(source.next(), Some(0.0)); + for i in 0..199 { + assert_eq!(source.next(), Some(0.0), "iteration {i}"); + } + assert_eq!(source.next(), Some(0.1)); +} + +mod source_without_span_or_lower_bound_ending_early { + use super::*; + + #[test] + fn with_span_len_queried_before_source_end() { + let test_source1 = TestSource::new(&[0.1; 5]) + .with_channels(1) + .with_sample_rate(1) + .with_false_span_len(None) + .with_false_lower_bound(0); + let test_source2 = TestSource::new(&[0.2; 5]) + .with_channels(1) + .with_sample_rate(1); + + let (controls, mut source) = queue::queue(true); + controls.append(test_source1); + controls.append(test_source2); + + assert_eq!(source.current_span_len(), Some(200)); + assert_eq!(source.next(), Some(0.1)); + assert_eq!(source.next(), Some(0.1)); + assert_eq!(source.next(), Some(0.1)); + assert_eq!(source.next(), Some(0.1)); + assert_eq!(source.next(), Some(0.1)); + + // silence filling the remaining fallback span + assert_eq!(source.next(), Some(0.0)); + } + + #[test] + fn without_span_queried() { + let test_source1 = TestSource::new(&[0.1; 5]) + .with_channels(1) + .with_sample_rate(1) + .with_false_span_len(None) + .with_false_lower_bound(0); + let test_source2 = TestSource::new(&[0.2; 5]) + .with_channels(1) + .with_sample_rate(1); + + let (controls, mut source) = queue::queue(true); + controls.append(test_source1); + controls.append(test_source2); + + assert_eq!(source.next(), Some(0.1)); + assert_eq!(source.next(), Some(0.1)); + assert_eq!(source.next(), Some(0.1)); + assert_eq!(source.next(), Some(0.1)); + assert_eq!(source.next(), Some(0.1)); + + assert_eq!(source.current_span_len(), Some(195)); + assert_eq!(source.take_while(|s| *s == 0.0).count(), 195); + } + + #[test] + fn span_ending_mid_frame() { + let mut test_source1 = TestSource::new(&[0.1, 0.2, 0.1, 0.2, 0.1]) + .with_channels(2) + .with_sample_rate(1) + .with_false_span_len(Some(6)); + let mut test_source2 = TestSource::new(&[0.3, 0.4, 0.3, 0.4]) + .with_channels(2) + .with_sample_rate(1); + + let (controls, mut source) = queue::queue(true); + controls.append(test_source1.clone()); + controls.append(test_source2.clone()); + + assert_eq!(source.current_span_len(), Some(6)); + assert_eq!(source.next(), test_source1.next()); + assert_eq!(source.next(), test_source1.next()); + assert_eq!(source.next(), test_source1.next()); + assert_eq!(source.next(), test_source1.next()); + assert_eq!(source.next(), test_source1.next()); + assert_eq!(source.current_span_len(), Some(1)); + assert_eq!(None, test_source1.next()); + + // extra sample to ensure frames are aligned + assert_eq!(source.next(), Some(0.0)); + + assert_eq!(source.current_span_len(), Some(4)); + assert_eq!(source.next(), test_source2.next(),); + assert_eq!(source.next(), test_source2.next()); + assert_eq!(source.next(), test_source2.next()); + assert_eq!(source.next(), test_source2.next()); + } +} + +// should be made into its own crate called: `rodio-test-support` +mod test_support { + use std::time::Duration; + + #[derive(Debug, Clone)] + pub struct TestSource { + samples: Vec, + pos: usize, + channels: rodio::ChannelCount, + sample_rate: rodio::SampleRate, + total_duration: Option, + lower_bound: usize, + total_span_len: Option, + } + + impl TestSource { + pub fn new<'a>(samples: impl IntoIterator) -> Self { + let samples = samples.into_iter().copied().collect::>(); + Self { + pos: 0, + channels: 1, + sample_rate: 1, + total_duration: None, + lower_bound: samples.len(), + total_span_len: Some(samples.len()), + samples, + } + } + + pub fn with_sample_rate(mut self, rate: rodio::SampleRate) -> Self { + self.sample_rate = rate; + self + } + pub fn with_channels(mut self, count: rodio::ChannelCount) -> Self { + self.channels = count; + self + } + #[expect( + dead_code, + reason = "will be moved to seperate rodio-test-support crate hopefully" + )] + pub fn with_total_duration(mut self, duration: Duration) -> Self { + self.total_duration = Some(duration); + self + } + pub fn with_false_span_len(mut self, total_len: Option) -> Self { + self.total_span_len = total_len; + self + } + pub fn with_false_lower_bound(mut self, lower_bound: usize) -> Self { + self.lower_bound = lower_bound; + self + } + } + + impl Iterator for TestSource { + type Item = f32; + + fn next(&mut self) -> Option { + let res = self.samples.get(self.pos).copied(); + self.pos += 1; + res + } + fn size_hint(&self) -> (usize, Option) { + (self.lower_bound, None) + } + } + + impl rodio::Source for TestSource { + fn current_span_len(&self) -> Option { + self.total_span_len.map(|len| len.saturating_sub(self.pos)) + } + fn channels(&self) -> rodio::ChannelCount { + self.channels + } + fn sample_rate(&self) -> rodio::SampleRate { + self.sample_rate + } + fn total_duration(&self) -> Option { + self.total_duration + } + fn try_seek(&mut self, pos: Duration) -> Result<(), rodio::source::SeekError> { + let duration_per_sample = Duration::from_secs(1) / self.sample_rate; + let offset = pos.div_duration_f64(duration_per_sample).floor() as usize; + self.pos = offset; + + Ok(()) + } + } +}