From 17f495e84d34fbeca476b89d77f8eeb0b3838790 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Maria=C5=84ski?= Date: Sat, 5 Mar 2022 18:09:41 +0100 Subject: [PATCH 01/10] Pipewire host & device --- Cargo.toml | 1 + src/host/mod.rs | 5 + src/host/pipewire/conn.rs | 140 +++++++++++++++++++++ src/host/pipewire/device.rs | 237 ++++++++++++++++++++++++++++++++++++ src/host/pipewire/mod.rs | 123 +++++++++++++++++++ src/platform/mod.rs | 8 +- 6 files changed, 513 insertions(+), 1 deletion(-) create mode 100644 src/host/pipewire/conn.rs create mode 100644 src/host/pipewire/device.rs create mode 100644 src/host/pipewire/mod.rs diff --git a/Cargo.toml b/Cargo.toml index df4079ca7..57e266d42 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,7 @@ alsa = "0.6" libc = "0.2" parking_lot = "0.12" jack = { version = "0.10", optional = true } +pipewire = { git = "https://gitlab.freedesktop.org/pipewire/pipewire-rs", optional = true } [target.'cfg(any(target_os = "macos", target_os = "ios"))'.dependencies] core-foundation-sys = "0.8.2" # For linking to CoreFoundation.framework and handling device name `CFString`s. diff --git a/src/host/mod.rs b/src/host/mod.rs index 8de06cbe0..f89e2c1b2 100644 --- a/src/host/mod.rs +++ b/src/host/mod.rs @@ -21,6 +21,11 @@ pub(crate) mod emscripten; feature = "jack" ))] pub(crate) mod jack; +#[cfg(all( + any(target_os = "linux", target_os = "dragonfly", target_os = "freebsd"), + feature = "pipewire" +))] +pub(crate) mod pipewire; pub(crate) mod null; #[cfg(target_os = "android")] pub(crate) mod oboe; diff --git a/src/host/pipewire/conn.rs b/src/host/pipewire/conn.rs new file mode 100644 index 000000000..fe4e0e3f8 --- /dev/null +++ b/src/host/pipewire/conn.rs @@ -0,0 +1,140 @@ +extern crate pipewire; + +use self::pipewire::{ + metadata::Metadata, + prelude::*, + registry::{GlobalObject, Registry}, + spa::ForeignDict, + types::ObjectType, +}; + +use std::{ + cell::{Cell, RefCell}, + rc::Rc, + sync::mpsc, + thread, +}; + +enum Message { + Terminate, + GetSettings, +} + +enum MessageRepl { + Settings(Settings), +} + +pub struct PWClient { + pw_sender: pipewire::channel::Sender, + main_receiver: mpsc::Receiver, +} + +impl PWClient { + pub fn new() -> Self { + let (main_sender, main_receiver) = mpsc::channel(); + let (pw_sender, pw_receiver) = pipewire::channel::channel(); + + let pw_thread = thread::spawn(move || pw_thread(main_sender, pw_receiver)); + + Self { + pw_sender, + main_receiver, + } + } + + pub fn get_settings(&self) -> Settings { + self.pw_sender.send(Message::GetSettings); + + if let MessageRepl::Settings(settings) = self.main_receiver.recv().expect("Reply") { + settings + } else { + Settings::default() + } + } +} + +#[derive(Default)] +struct State { + settings: Settings, +} + +#[derive(Default, Clone)] +struct Settings { + pub sample_rate: u32, + pub min_buffer_size: u32, + pub max_buffer_size: u32, + pub default_buffer_size: u32, +} + +fn pw_thread( + main_sender: mpsc::Sender, + pw_receiver: pipewire::channel::Receiver, +) { + let state = Rc::new(State::default()); + // let state = Rc::new(RefCell::new(State::default())); + + let mainloop = pipewire::MainLoop::new().expect("Failed to create PipeWire Mainloop"); + + let context = pipewire::Context::new(&mainloop).expect("Failed to create PipeWire Context"); + let core = context + .connect(None) + .expect("Failed to connect to PipeWire"); + let registry = Rc::new(core.get_registry().expect("Failed to get Registry")); + + let _receiver = pw_receiver.attach(&mainloop, |msg| { + let mainloop = mainloop.clone(); + + match msg { + Message::Terminate => mainloop.quit(), + Message::GetSettings => { + main_sender.send(MessageRepl::Settings(state.settings.clone())); + } + } + }); + + let state_clone = state.clone(); + let _listener = registry + .add_listener_local() + .global(|global| match global.type_ { + ObjectType::Metadata => handle_metadata(global, state_clone, ®istry), + _ => {} + }); + + mainloop.run(); +} + +fn handle_metadata( + metadata: &GlobalObject, + state: Rc, + registry: &Rc, +) { + let props = metadata + .props + .as_ref() + .expect("Metadata object is missing properties"); + + match props.get("metadata.name") { + Some("settings") => { + let settings: Metadata = registry.bind(metadata).expect("Metadata"); + + settings + .add_listener_local() + .property(|_, key, _, value| { + if let Some(value) = value { + if let Ok(value) = value.parse::() { + match key { + Some("clock.rate") => state.settings.sample_rate = value, + Some("clock.quantum") => state.settings.default_buffer_size = value, + Some("clock.min-quantum") => state.settings.min_buffer_size = value, + Some("clock.max-quantum") => state.settings.max_buffer_size = value, + None => {} + }; + } + } + 0 + }) + .register(); + } + None => {} + }; +} diff --git a/src/host/pipewire/device.rs b/src/host/pipewire/device.rs new file mode 100644 index 000000000..1c5e004f6 --- /dev/null +++ b/src/host/pipewire/device.rs @@ -0,0 +1,237 @@ +use crate::{ + BackendSpecificError, BuildStreamError, Data, DefaultStreamConfigError, DeviceNameError, + InputCallbackInfo, OutputCallbackInfo, SampleFormat, SampleRate, StreamConfig, StreamError, + SupportedBufferSize, SupportedStreamConfig, SupportedStreamConfigRange, + SupportedStreamConfigsError, +}; +use std::hash::{Hash, Hasher}; +use std::rc::Rc; +use traits::DeviceTrait; + +use super::stream::Stream; +use super::PIPEWIRE_SAMPLE_FORMAT; + +pub type SupportedInputConfigs = std::vec::IntoIter; +pub type SupportedOutputConfigs = std::vec::IntoIter; + +const DEFAULT_NUM_CHANNELS: u16 = 2; +const DEFAULT_SUPPORTED_CHANNELS: [u16; 10] = [1, 2, 4, 6, 8, 16, 24, 32, 48, 64]; + +/// If a device is for input or output. +/// Until we have duplex stream support PipeWire nodes and CPAL devices for PipeWire will be either input or output. +#[derive(Clone, Debug)] +pub enum DeviceType { + InputDevice, + OutputDevice, +} +#[derive(Clone)] +pub struct Device { + name: String, + sample_rate: SampleRate, + buffer_size: SupportedBufferSize, + device_type: DeviceType, + connect_ports_automatically: bool, + client: Rc +} + +impl Device { + fn new_device( + name: String, + connect_ports_automatically: bool, + device_type: DeviceType, + client: super::conn::PWClient, + ) -> Result { + let settings = client.get_settings(); + + Ok(Device { + name: "cpal_client".to_string(), + sample_rate: SampleRate(settings.sample_rate), + buffer_size: SupportedBufferSize::Range { + min: settings.min_buffer_size, + max: settings.max_buffer_size, + }, + device_type, + connect_ports_automatically, + client: Rc::new(client) + }) + } + + pub fn default_output_device( + name: &str, + connect_ports_automatically: bool, + client: super::conn::PWClient, + ) -> Result { + let output_client_name = format!("{}_out", name); + Device::new_device( + output_client_name, + connect_ports_automatically, + DeviceType::OutputDevice, + client, + ) + } + + pub fn default_input_device( + name: &str, + connect_ports_automatically: bool, + client: super::conn::PWClient, + ) -> Result { + let input_client_name = format!("{}_in", name); + Device::new_device( + input_client_name, + connect_ports_automatically, + DeviceType::InputDevice, + client, + ) + } + + pub fn default_config(&self) -> Result { + let channels = DEFAULT_NUM_CHANNELS; + let sample_rate = self.sample_rate; + let buffer_size = self.buffer_size.clone(); + // The sample format for JACK audio ports is always "32-bit float mono audio" in the current implementation. + // Custom formats are allowed within JACK, but this is of niche interest. + // The format can be found programmatically by calling jack::PortSpec::port_type() on a created port. + let sample_format = PIPEWIRE_SAMPLE_FORMAT; + Ok(SupportedStreamConfig { + channels, + sample_rate, + buffer_size, + sample_format, + }) + } + + pub fn supported_configs(&self) -> Vec { + let f = match self.default_config() { + Err(_) => return vec![], + Ok(f) => f, + }; + + let mut supported_configs = vec![]; + + for &channels in DEFAULT_SUPPORTED_CHANNELS.iter() { + supported_configs.push(SupportedStreamConfigRange { + channels, + min_sample_rate: f.sample_rate, + max_sample_rate: f.sample_rate, + buffer_size: f.buffer_size.clone(), + sample_format: f.sample_format, + }); + } + supported_configs + } + + pub fn is_input(&self) -> bool { + matches!(self.device_type, DeviceType::InputDevice) + } + + pub fn is_output(&self) -> bool { + matches!(self.device_type, DeviceType::OutputDevice) + } +} + +impl DeviceTrait for Device { + type SupportedInputConfigs = SupportedInputConfigs; + type SupportedOutputConfigs = SupportedOutputConfigs; + type Stream = Stream; + + fn name(&self) -> Result { + Ok(self.name.clone()) + } + + fn supported_input_configs( + &self, + ) -> Result { + Ok(self.supported_configs().into_iter()) + } + + fn supported_output_configs( + &self, + ) -> Result { + Ok(self.supported_configs().into_iter()) + } + + /// Returns the default input config + /// The sample format for JACK audio ports is always "32-bit float mono audio" unless using a custom type. + /// The sample rate is set by the JACK server. + fn default_input_config(&self) -> Result { + self.default_config() + } + + /// Returns the default output config + /// The sample format for JACK audio ports is always "32-bit float mono audio" unless using a custom type. + /// The sample rate is set by the JACK server. + fn default_output_config(&self) -> Result { + self.default_config() + } + + fn build_input_stream_raw( + &self, + conf: &StreamConfig, + sample_format: SampleFormat, + data_callback: D, + error_callback: E, + ) -> Result + where + D: FnMut(&Data, &InputCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, + { + if let DeviceType::OutputDevice = &self.device_type { + // Trying to create an input stream from an output device + return Err(BuildStreamError::StreamConfigNotSupported); + } + if conf.sample_rate != self.sample_rate || sample_format != PIPEWIRE_SAMPLE_FORMAT { + return Err(BuildStreamError::StreamConfigNotSupported); + } + + let mut stream = Stream::new_input(self.client, conf.channels, data_callback, error_callback); + + if self.connect_ports_automatically { + stream.connect_to_system_inputs(); + } + + Ok(stream) + } + + fn build_output_stream_raw( + &self, + conf: &StreamConfig, + sample_format: SampleFormat, + data_callback: D, + error_callback: E, + ) -> Result + where + D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, + { + if let DeviceType::InputDevice = &self.device_type { + // Trying to create an output stream from an input device + return Err(BuildStreamError::StreamConfigNotSupported); + } + if conf.sample_rate != self.sample_rate || sample_format != PIPEWIRE_SAMPLE_FORMAT { + return Err(BuildStreamError::StreamConfigNotSupported); + } + + let mut stream = Stream::new_output(self.client, conf.channels, data_callback, error_callback); + + if self.connect_ports_automatically { + stream.connect_to_system_outputs(); + } + + Ok(stream) + } +} + +impl PartialEq for Device { + fn eq(&self, other: &Self) -> bool { + // Device::name() can never fail in this implementation + self.name().unwrap() == other.name().unwrap() + } +} + +impl Eq for Device {} + +impl Hash for Device { + fn hash(&self, state: &mut H) { + self.name().unwrap().hash(state); + } +} diff --git a/src/host/pipewire/mod.rs b/src/host/pipewire/mod.rs new file mode 100644 index 000000000..67613f5b1 --- /dev/null +++ b/src/host/pipewire/mod.rs @@ -0,0 +1,123 @@ +extern crate pipewire; + +use std::sync::mpsc; + +use crate::{DevicesError, SampleFormat, SupportedStreamConfigRange}; +use traits::HostTrait; + +mod device; +pub use self::device::Device; +pub use self::stream::Stream; +mod stream; +mod conn; + +const PIPEWIRE_SAMPLE_FORMAT: SampleFormat = SampleFormat::F32; + +pub type SupportedInputConfigs = std::vec::IntoIter; +pub type SupportedOutputConfigs = std::vec::IntoIter; +pub type Devices = std::vec::IntoIter; + +/// The PipeWire Host type + +pub struct Host { + /// The name that the client will have in PipeWire. + /// Until we have duplex streams two clients will be created adding "out" or "in" to the name + /// since names have to be unique. + name: String, + /// If ports are to be connected to the system (soundcard) ports automatically (default is true). + connect_ports_automatically: bool, + /// A list of the devices that have been created from this Host. + devices_created: Vec, + + client: conn::PWClient +} + +impl Host { + pub fn new() -> Result { + let client = conn::PWClient::new(); + + let mut host = Host { + name: "cpal_client".to_owned(), + connect_ports_automatically: true, + devices_created: vec![], + client + }; + + // Devices don't exist for PipeWire, they have to be created + host.initialize_default_devices(); + Ok(host) + } + /// Set whether the ports should automatically be connected to system + /// (default is true) + pub fn set_connect_automatically(&mut self, do_connect: bool) { + self.connect_ports_automatically = do_connect; + } + + pub fn input_device_with_name(&mut self, name: &str) -> Option { + self.name = name.to_owned(); + self.default_input_device() + } + + pub fn output_device_with_name(&mut self, name: &str) -> Option { + self.name = name.to_owned(); + self.default_output_device() + } + + fn initialize_default_devices(&mut self) { + let in_device_res = Device::default_input_device( + &self.name, + self.connect_ports_automatically, + self.client + ); + + match in_device_res { + Ok(device) => self.devices_created.push(device), + Err(err) => { + println!("{}", err); + } + } + + let out_device_res = Device::default_output_device( + &self.name, + self.connect_ports_automatically, + self.client + ); + match out_device_res { + Ok(device) => self.devices_created.push(device), + Err(err) => { + println!("{}", err); + } + } + } +} + +impl HostTrait for Host { + type Devices = Devices; + type Device = Device; + + fn is_available() -> bool { + true + } + + fn devices(&self) -> Result { + Ok(self.devices_created.clone().into_iter()) + } + + fn default_input_device(&self) -> Option { + for device in &self.devices_created { + if device.is_input() { + return Some(device.clone()); + } + } + None + } + + fn default_output_device(&self) -> Option { + for device in &self.devices_created { + if device.is_output() { + return Some(device.clone()); + } + } + None + } +} \ No newline at end of file diff --git a/src/platform/mod.rs b/src/platform/mod.rs index 3b566f15f..2e4e75ea2 100644 --- a/src/platform/mod.rs +++ b/src/platform/mod.rs @@ -586,8 +586,14 @@ mod platform_impl { SupportedInputConfigs as JackSupportedInputConfigs, SupportedOutputConfigs as JackSupportedOutputConfigs, }; + #[cfg(feature = "pipewire")] + pub use crate::host::pipewire::{ + Device as PipeWireDevice, Devices as PipeWireDevices, Host as PipeWireHost, Stream as PipeWireStream, + SupportedInputConfigs as PipeWireSupportedInputConfigs, + SupportedOutputConfigs as PipeWireupportedOutputConfigs, + }; - impl_platform_host!(#[cfg(feature = "jack")] Jack jack "JACK", Alsa alsa "ALSA"); + impl_platform_host!(#[cfg(feature = "pipewire")] PipeWire pipewire "PipeWire", #[cfg(feature = "jack")] Jack jack "JACK", Alsa alsa "ALSA"); /// The default host for the current compilation target platform. pub fn default_host() -> Host { From c0661756f7672555dd77f6b923956b9649a04e89 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Maria=C5=84ski?= Date: Sat, 5 Mar 2022 18:10:42 +0100 Subject: [PATCH 02/10] stream code base copied from jack host, nonfunctional --- src/host/pipewire/stream.rs | 464 ++++++++++++++++++++++++++++++++++++ 1 file changed, 464 insertions(+) create mode 100644 src/host/pipewire/stream.rs diff --git a/src/host/pipewire/stream.rs b/src/host/pipewire/stream.rs new file mode 100644 index 000000000..8876db8da --- /dev/null +++ b/src/host/pipewire/stream.rs @@ -0,0 +1,464 @@ +use crate::ChannelCount; +use std::rc::Rc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex}; +use traits::StreamTrait; + +use crate::{ + BackendSpecificError, Data, InputCallbackInfo, OutputCallbackInfo, PauseStreamError, + PlayStreamError, SampleRate, StreamError, +}; + +use super::PIPEWIRE_SAMPLE_FORMAT; + +type ErrorCallbackPtr = Arc>; + +pub struct Stream { + // TODO: It might be faster to send a message when playing/pausing than to check this every iteration + playing: Arc, + async_client: jack::AsyncClient, + // Port names are stored in order to connect them to other ports in jack automatically + input_port_names: Vec, + output_port_names: Vec, +} + +impl Stream { + // TODO: Return error messages + pub fn new_input( + client: Rc, + channels: ChannelCount, + data_callback: D, + mut error_callback: E, + ) -> Stream + where + D: FnMut(&Data, &InputCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, + { + let mut ports = vec![]; + let mut port_names: Vec = vec![]; + // Create ports + for i in 0..channels { + let port_try = client.register_port(&format!("in_{}", i), jack::AudioIn::default()); + match port_try { + Ok(port) => { + // Get the port name in order to later connect it automatically + if let Ok(port_name) = port.name() { + port_names.push(port_name); + } + // Store the port into a Vec to move to the ProcessHandler + ports.push(port); + } + Err(e) => { + // If port creation failed, send the error back via the error_callback + error_callback( + BackendSpecificError { + description: e.to_string(), + } + .into(), + ); + } + } + } + + let playing = Arc::new(AtomicBool::new(true)); + + let error_callback_ptr = Arc::new(Mutex::new(error_callback)) as ErrorCallbackPtr; + + let input_process_handler = LocalProcessHandler::new( + vec![], + ports, + SampleRate(client.sample_rate() as u32), + client.buffer_size() as usize, + Some(Box::new(data_callback)), + None, + playing.clone(), + Arc::clone(&error_callback_ptr), + ); + + let notification_handler = JackNotificationHandler::new(error_callback_ptr); + + let async_client = client + .activate_async(notification_handler, input_process_handler) + .unwrap(); + + Stream { + playing, + async_client, + input_port_names: port_names, + output_port_names: vec![], + } + } + + pub fn new_output( + client: Rc, + channels: ChannelCount, + data_callback: D, + mut error_callback: E, + ) -> Stream + where + D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, + { + let mut ports = vec![]; + let mut port_names: Vec = vec![]; + // Create ports + for i in 0..channels { + let port_try = client.register_port(&format!("out_{}", i), jack::AudioOut::default()); + match port_try { + Ok(port) => { + // Get the port name in order to later connect it automatically + if let Ok(port_name) = port.name() { + port_names.push(port_name); + } + // Store the port into a Vec to move to the ProcessHandler + ports.push(port); + } + Err(e) => { + // If port creation failed, send the error back via the error_callback + error_callback( + BackendSpecificError { + description: e.to_string(), + } + .into(), + ); + } + } + } + + let playing = Arc::new(AtomicBool::new(true)); + + let error_callback_ptr = Arc::new(Mutex::new(error_callback)) as ErrorCallbackPtr; + + let output_process_handler = LocalProcessHandler::new( + ports, + vec![], + SampleRate(client.sample_rate() as u32), + client.buffer_size() as usize, + None, + Some(Box::new(data_callback)), + playing.clone(), + Arc::clone(&error_callback_ptr), + ); + + let notification_handler = JackNotificationHandler::new(error_callback_ptr); + + let async_client = client + .activate_async(notification_handler, output_process_handler) + .unwrap(); + + Stream { + playing, + async_client, + input_port_names: vec![], + output_port_names: port_names, + } + } + + /// Connect to the standard system outputs in jack, system:playback_1 and system:playback_2 + /// This has to be done after the client is activated, doing it just after creating the ports doesn't work. + pub fn connect_to_system_outputs(&mut self) { + // Get the system ports + let system_ports = self.async_client.as_client().ports( + Some("system:playback_.*"), + None, + jack::PortFlags::empty(), + ); + + // Connect outputs from this client to the system playback inputs + for i in 0..self.output_port_names.len() { + if i >= system_ports.len() { + break; + } + match self + .async_client + .as_client() + .connect_ports_by_name(&self.output_port_names[i], &system_ports[i]) + { + Ok(_) => (), + Err(e) => println!("Unable to connect to port with error {}", e), + } + } + } + + /// Connect to the standard system outputs in jack, system:capture_1 and system:capture_2 + /// This has to be done after the client is activated, doing it just after creating the ports doesn't work. + pub fn connect_to_system_inputs(&mut self) { + // Get the system ports + let system_ports = self.async_client.as_client().ports( + Some("system:capture_.*"), + None, + jack::PortFlags::empty(), + ); + + // Connect outputs from this client to the system playback inputs + for i in 0..self.input_port_names.len() { + if i >= system_ports.len() { + break; + } + match self + .async_client + .as_client() + .connect_ports_by_name(&system_ports[i], &self.input_port_names[i]) + { + Ok(_) => (), + Err(e) => println!("Unable to connect to port with error {}", e), + } + } + } +} + +impl StreamTrait for Stream { + fn play(&self) -> Result<(), PlayStreamError> { + self.playing.store(true, Ordering::SeqCst); + Ok(()) + } + + fn pause(&self) -> Result<(), PauseStreamError> { + self.playing.store(false, Ordering::SeqCst); + Ok(()) + } +} + +struct LocalProcessHandler { + /// No new ports are allowed to be created after the creation of the LocalProcessHandler as that would invalidate the buffer sizes + out_ports: Vec>, + in_ports: Vec>, + + sample_rate: SampleRate, + buffer_size: usize, + input_data_callback: Option>, + output_data_callback: Option>, + + // JACK audio samples are 32-bit float (unless you do some custom dark magic) + temp_input_buffer: Vec, + temp_output_buffer: Vec, + playing: Arc, + creation_timestamp: std::time::Instant, + /// This should not be called on `process`, only on `buffer_size` because it can block. + error_callback_ptr: ErrorCallbackPtr, +} + +impl LocalProcessHandler { + fn new( + out_ports: Vec>, + in_ports: Vec>, + sample_rate: SampleRate, + buffer_size: usize, + input_data_callback: Option>, + output_data_callback: Option< + Box, + >, + playing: Arc, + error_callback_ptr: ErrorCallbackPtr, + ) -> Self { + // These may be reallocated in the `buffer_size` callback. + let temp_input_buffer = vec![0.0; in_ports.len() * buffer_size]; + let temp_output_buffer = vec![0.0; out_ports.len() * buffer_size]; + + LocalProcessHandler { + out_ports, + in_ports, + sample_rate, + buffer_size, + input_data_callback, + output_data_callback, + temp_input_buffer, + temp_output_buffer, + playing, + creation_timestamp: std::time::Instant::now(), + error_callback_ptr, + } + } +} + +fn temp_buffer_to_data(temp_input_buffer: &mut Vec, total_buffer_size: usize) -> Data { + let slice = &temp_input_buffer[0..total_buffer_size]; + let data = slice.as_ptr() as *mut (); + let len = total_buffer_size; + let data = unsafe { Data::from_parts(data, len, PIPEWIRE_SAMPLE_FORMAT) }; + data +} + +impl jack::ProcessHandler for LocalProcessHandler { + fn process(&mut self, _: &jack::Client, process_scope: &jack::ProcessScope) -> jack::Control { + if !self.playing.load(Ordering::SeqCst) { + return jack::Control::Continue; + } + + // This should be equal to self.buffer_size, but the implementation will + // work even if it is less. Will panic in `temp_buffer_to_data` if greater. + let current_frame_count = process_scope.n_frames() as usize; + + // Get timestamp data + let cycle_times = process_scope.cycle_times(); + let current_start_usecs = match cycle_times { + Ok(times) => times.current_usecs, + Err(_) => { + // jack was unable to get the current time information + // Fall back to using Instants + let now = std::time::Instant::now(); + let duration = now.duration_since(self.creation_timestamp); + duration.as_micros() as u64 + } + }; + let start_cycle_instant = micros_to_stream_instant(current_start_usecs); + let start_callback_instant = start_cycle_instant + .add(frames_to_duration( + process_scope.frames_since_cycle_start() as usize, + self.sample_rate, + )) + .expect("`playback` occurs beyond representation supported by `StreamInstant`"); + + if let Some(input_callback) = &mut self.input_data_callback { + // Let's get the data from the input ports and run the callback + + let num_in_channels = self.in_ports.len(); + + // Read the data from the input ports into the temporary buffer + // Go through every channel and store its data in the temporary input buffer + for ch_ix in 0..num_in_channels { + let input_channel = &self.in_ports[ch_ix].as_slice(process_scope); + for i in 0..current_frame_count { + self.temp_input_buffer[ch_ix + i * num_in_channels] = input_channel[i]; + } + } + // Create a slice of exactly current_frame_count frames + let data = temp_buffer_to_data( + &mut self.temp_input_buffer, + current_frame_count * num_in_channels, + ); + // Create timestamp + let frames_since_cycle_start = process_scope.frames_since_cycle_start() as usize; + let duration_since_cycle_start = + frames_to_duration(frames_since_cycle_start, self.sample_rate); + let callback = start_callback_instant + .add(duration_since_cycle_start) + .expect("`playback` occurs beyond representation supported by `StreamInstant`"); + let capture = start_callback_instant; + let timestamp = crate::InputStreamTimestamp { callback, capture }; + let info = crate::InputCallbackInfo { timestamp }; + input_callback(&data, &info); + } + + if let Some(output_callback) = &mut self.output_data_callback { + let num_out_channels = self.out_ports.len(); + + // Create a slice of exactly current_frame_count frames + let mut data = temp_buffer_to_data( + &mut self.temp_output_buffer, + current_frame_count * num_out_channels, + ); + // Create timestamp + let frames_since_cycle_start = process_scope.frames_since_cycle_start() as usize; + let duration_since_cycle_start = + frames_to_duration(frames_since_cycle_start, self.sample_rate); + let callback = start_callback_instant + .add(duration_since_cycle_start) + .expect("`playback` occurs beyond representation supported by `StreamInstant`"); + let buffer_duration = frames_to_duration(current_frame_count, self.sample_rate); + let playback = start_cycle_instant + .add(buffer_duration) + .expect("`playback` occurs beyond representation supported by `StreamInstant`"); + let timestamp = crate::OutputStreamTimestamp { callback, playback }; + let info = crate::OutputCallbackInfo { timestamp }; + output_callback(&mut data, &info); + + // Deinterlace + for ch_ix in 0..num_out_channels { + let output_channel = &mut self.out_ports[ch_ix].as_mut_slice(process_scope); + for i in 0..current_frame_count { + output_channel[i] = self.temp_output_buffer[ch_ix + i * num_out_channels]; + } + } + } + + // Continue as normal + jack::Control::Continue + } + + fn buffer_size(&mut self, _: &jack::Client, size: jack::Frames) -> jack::Control { + // The `buffer_size` callback is actually called on the process thread, but + // it does not need to be suitable for real-time use. Thus we can simply allocate + // new buffers here. It is also fine to call the error callback. + // Details: https://github.com/RustAudio/rust-jack/issues/137 + let new_size = size as usize; + if new_size != self.buffer_size { + self.buffer_size = new_size; + self.temp_input_buffer = vec![0.0; self.in_ports.len() * new_size]; + self.temp_output_buffer = vec![0.0; self.out_ports.len() * new_size]; + let description = format!("buffer size changed to: {}", new_size); + if let Ok(mut mutex_guard) = self.error_callback_ptr.lock() { + let err = &mut *mutex_guard; + err(BackendSpecificError { description }.into()); + } + } + + jack::Control::Continue + } +} + +fn micros_to_stream_instant(micros: u64) -> crate::StreamInstant { + let nanos = micros * 1000; + let secs = micros / 1_000_000; + let subsec_nanos = nanos - secs * 1_000_000_000; + crate::StreamInstant::new(secs as i64, subsec_nanos as u32) +} + +// Convert the given duration in frames at the given sample rate to a `std::time::Duration`. +fn frames_to_duration(frames: usize, rate: crate::SampleRate) -> std::time::Duration { + let secsf = frames as f64 / rate.0 as f64; + let secs = secsf as u64; + let nanos = ((secsf - secs as f64) * 1_000_000_000.0) as u32; + std::time::Duration::new(secs, nanos) +} + +/// Receives notifications from the JACK server. It is unclear if this may be run concurrent with itself under JACK2 specs +/// so it needs to be Sync. +struct JackNotificationHandler { + error_callback_ptr: ErrorCallbackPtr, + init_sample_rate_flag: Arc, +} + +impl JackNotificationHandler { + pub fn new(error_callback_ptr: ErrorCallbackPtr) -> Self { + JackNotificationHandler { + error_callback_ptr, + init_sample_rate_flag: Arc::new(AtomicBool::new(false)), + } + } + + fn send_error(&mut self, description: String) { + // This thread isn't the audio thread, it's fine to block + if let Ok(mut mutex_guard) = self.error_callback_ptr.lock() { + let err = &mut *mutex_guard; + err(BackendSpecificError { description }.into()); + } + } +} + +impl jack::NotificationHandler for JackNotificationHandler { + fn shutdown(&mut self, _status: jack::ClientStatus, reason: &str) { + self.send_error(format!("JACK was shut down for reason: {}", reason)); + } + + fn sample_rate(&mut self, _: &jack::Client, srate: jack::Frames) -> jack::Control { + match self.init_sample_rate_flag.load(Ordering::SeqCst) { + false => { + // One of these notifications is sent every time a client is started. + self.init_sample_rate_flag.store(true, Ordering::SeqCst); + jack::Control::Continue + } + true => { + self.send_error(format!("sample rate changed to: {}", srate)); + // Since CPAL currently has no way of signaling a sample rate change in order to make + // all necessary changes that would bring we choose to quit. + jack::Control::Quit + } + } + } + + fn xrun(&mut self, _: &jack::Client) -> jack::Control { + self.send_error(String::from("xrun (buffer over or under run)")); + jack::Control::Continue + } +} From 213c19a71326e0c2b5f7cb0557f51c0d1fbe0bcd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Maria=C5=84ski?= Date: Wed, 9 Mar 2022 15:40:12 +0100 Subject: [PATCH 03/10] some fixes, node creation --- Cargo.toml | 2 +- src/host/pipewire/conn.rs | 216 +++++++++++++++++++++++++------ src/host/pipewire/device.rs | 20 +-- src/host/pipewire/mod.rs | 9 +- src/host/pipewire/stream.rs | 250 ++++++++++++++++++------------------ 5 files changed, 320 insertions(+), 177 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 57e266d42..118f9254e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,4 +85,4 @@ name = "feedback" name = "record_wav" [[example]] -name = "synth_tones" +name = "synth_tones" \ No newline at end of file diff --git a/src/host/pipewire/conn.rs b/src/host/pipewire/conn.rs index fe4e0e3f8..1f3000d6c 100644 --- a/src/host/pipewire/conn.rs +++ b/src/host/pipewire/conn.rs @@ -2,26 +2,41 @@ extern crate pipewire; use self::pipewire::{ metadata::Metadata, + node::Node, prelude::*, registry::{GlobalObject, Registry}, - spa::ForeignDict, + spa::{Direction, ForeignDict}, types::ObjectType, + Core, MainLoop, }; use std::{ + borrow::BorrowMut, cell::{Cell, RefCell}, rc::Rc, sync::mpsc, thread, }; +use super::device::DeviceType; + +#[derive(Debug)] enum Message { Terminate, GetSettings, + CreateDeviceNode { + name: String, + device_type: DeviceType, + }, } enum MessageRepl { Settings(Settings), + NodeInfo(NodeInfo), +} + +pub struct NodeInfo { + pub name: String, } pub struct PWClient { @@ -34,7 +49,7 @@ impl PWClient { let (main_sender, main_receiver) = mpsc::channel(); let (pw_sender, pw_receiver) = pipewire::channel::channel(); - let pw_thread = thread::spawn(move || pw_thread(main_sender, pw_receiver)); + let _pw_thread = thread::spawn(move || pw_thread(main_sender, pw_receiver)); Self { pw_sender, @@ -42,24 +57,44 @@ impl PWClient { } } - pub fn get_settings(&self) -> Settings { - self.pw_sender.send(Message::GetSettings); + pub fn get_settings(&self) -> Result { + match self.pw_sender.send(Message::GetSettings) { + Ok(_) => match self.main_receiver.recv() { + Ok(MessageRepl::Settings(settings)) => Ok(settings), + Err(err) => Err(format!("{:?}", err)), + _ => Err(format!("")), + }, + Err(err) => Err(format!("{:?}", err)), + } + } - if let MessageRepl::Settings(settings) = self.main_receiver.recv().expect("Reply") { - settings - } else { - Settings::default() - } + pub fn create_device_node( + &self, + name: String, + device_type: DeviceType, + ) -> Result { + match self + .pw_sender + .send(Message::CreateDeviceNode { name, device_type }) + { + Ok(_) => match self.main_receiver.recv() { + Ok(MessageRepl::NodeInfo(info)) => Ok(info), + Err(err) => Err(format!("{:?}", err)), + _ => Err(format!("")), + }, + Err(err) => Err(format!("{:?}", err)), + } } } #[derive(Default)] struct State { settings: Settings, + running: bool, } -#[derive(Default, Clone)] -struct Settings { +#[derive(Default, Clone, Debug)] +pub struct Settings { pub sample_rate: u32, pub min_buffer_size: u32, pub max_buffer_size: u32, @@ -70,43 +105,104 @@ fn pw_thread( main_sender: mpsc::Sender, pw_receiver: pipewire::channel::Receiver, ) { - let state = Rc::new(State::default()); - // let state = Rc::new(RefCell::new(State::default())); + // let state = Rc::new(State::default()); + let state = Rc::new(RefCell::new(State::default())); let mainloop = pipewire::MainLoop::new().expect("Failed to create PipeWire Mainloop"); let context = pipewire::Context::new(&mainloop).expect("Failed to create PipeWire Context"); - let core = context - .connect(None) - .expect("Failed to connect to PipeWire"); + let core = Rc::new( + context + .connect(None) + .expect("Failed to connect to PipeWire"), + ); let registry = Rc::new(core.get_registry().expect("Failed to get Registry")); - let _receiver = pw_receiver.attach(&mainloop, |msg| { + let _receiver = pw_receiver.attach(&mainloop, { let mainloop = mainloop.clone(); + let state = state.clone(); + let main_sender = main_sender.clone(); + let core = core.clone(); - match msg { + move |msg| match msg { Message::Terminate => mainloop.quit(), Message::GetSettings => { - main_sender.send(MessageRepl::Settings(state.settings.clone())); + let settings = state.borrow().settings.clone(); + main_sender.send(MessageRepl::Settings(settings)); + } + Message::CreateDeviceNode { name, device_type } => { + println!("Creating device"); + let node: Node = core + .create_object( + "adapter", //node_factory.get().expect("No node factory found"), + &pipewire::properties! { + *pipewire::keys::NODE_NAME => name.clone(), + *pipewire::keys::FACTORY_NAME => "support.null-audio-sink", + // *pipewire::keys::MEDIA_CLASS => match device_type { + // DeviceType::InputDevice => "Audio/Sink", + // DeviceType::OutputDevice => "Audio/Source" + // }, + *pipewire::keys::MEDIA_CLASS => "Audio/Sink", + // Don't remove the object on the remote when we destroy our proxy. + // *pipewire::keys::OBJECT_LINGER => "1" + }, + ) + .expect("Failed to create object"); + + let _list = node.add_listener_local() + .info(|f| { + println!("{:?}", f); + }) + .param(|a, b, c, d| { + println!("{}, {}, {}, {}", a,b,c,d); + }) + .register(); + + do_roundtrip(&mainloop, &core, &state); + println!("{:?}", node); + + main_sender.send(MessageRepl::NodeInfo(NodeInfo { name })); + + state.as_ref().borrow_mut().running = false; + mainloop.quit(); } } }); - let state_clone = state.clone(); let _listener = registry .add_listener_local() - .global(|global| match global.type_ { - ObjectType::Metadata => handle_metadata(global, state_clone, ®istry), - _ => {} - }); + .global({ + let state = state.clone(); + let registry = registry.clone(); + let mainloop = mainloop.clone(); + let core = core.clone(); + + move |global| match global.type_ { + ObjectType::Metadata => { + handle_metadata(global, state.clone(), ®istry, &mainloop, &core) + } + _ => {} + } + }) + .register(); + + do_roundtrip(&mainloop, &core, &state); - mainloop.run(); + loop { + if state.borrow().running { + println!("LOOP START"); + mainloop.run(); + println!("LOOP END"); + } + } } fn handle_metadata( metadata: &GlobalObject, - state: Rc, + state: Rc>, registry: &Rc, + mainloop: &MainLoop, + core: &Rc, ) { let props = metadata .props @@ -117,24 +213,66 @@ fn handle_metadata( Some("settings") => { let settings: Metadata = registry.bind(metadata).expect("Metadata"); - settings + let _listener = settings .add_listener_local() - .property(|_, key, _, value| { - if let Some(value) = value { - if let Ok(value) = value.parse::() { - match key { - Some("clock.rate") => state.settings.sample_rate = value, - Some("clock.quantum") => state.settings.default_buffer_size = value, - Some("clock.min-quantum") => state.settings.min_buffer_size = value, - Some("clock.max-quantum") => state.settings.max_buffer_size = value, - None => {} - }; + .property({ + let state = state.clone(); + move |_, key, _, value| { + let mut state = state.as_ref().borrow_mut(); + if let Some(value) = value { + if let Ok(value) = value.parse::() { + match key { + Some("clock.rate") => state.settings.sample_rate = value, + Some("clock.quantum") => { + state.settings.default_buffer_size = value + } + Some("clock.min-quantum") => { + state.settings.min_buffer_size = value + } + Some("clock.max-quantum") => { + state.settings.max_buffer_size = value + } + _ => {} + }; + } } + 0 } - 0 }) .register(); + + do_roundtrip(mainloop, core, &state); } - None => {} + _ => {} }; } + +fn do_roundtrip(mainloop: &pipewire::MainLoop, core: &pipewire::Core, state: &Rc>) { + let done = Rc::new(Cell::new(false)); + let done_clone = done.clone(); + let loop_clone = mainloop.clone(); + let state = state.clone(); + + state.as_ref().borrow_mut().running = false; + mainloop.quit(); + + // Trigger the sync event. The server's answer won't be processed until we start the main loop, + // so we can safely do this before setting up a callback. This lets us avoid using a Cell. + let pending = core.sync(0).expect("sync failed"); + + let _listener_core = core + .add_listener_local() + .done(move |id, seq| { + if id == pipewire::PW_ID_CORE && seq == pending { + done_clone.set(true); + loop_clone.quit(); + } + }) + .register(); + + while !done.get() { + mainloop.run(); + } + + state.as_ref().borrow_mut().running = true; +} diff --git a/src/host/pipewire/device.rs b/src/host/pipewire/device.rs index 1c5e004f6..6632395f0 100644 --- a/src/host/pipewire/device.rs +++ b/src/host/pipewire/device.rs @@ -39,12 +39,16 @@ impl Device { name: String, connect_ports_automatically: bool, device_type: DeviceType, - client: super::conn::PWClient, + client: Rc, ) -> Result { - let settings = client.get_settings(); + while client.get_settings().and_then(|s| if s.sample_rate == 0 {Err(String::new())} else {Ok(true)} ).is_err() {} + + let settings = client.get_settings().unwrap(); + + let info = client.create_device_node(name, device_type.clone()).expect("Error creating device"); Ok(Device { - name: "cpal_client".to_string(), + name: info.name, sample_rate: SampleRate(settings.sample_rate), buffer_size: SupportedBufferSize::Range { min: settings.min_buffer_size, @@ -52,14 +56,14 @@ impl Device { }, device_type, connect_ports_automatically, - client: Rc::new(client) + client }) } pub fn default_output_device( name: &str, connect_ports_automatically: bool, - client: super::conn::PWClient, + client: Rc, ) -> Result { let output_client_name = format!("{}_out", name); Device::new_device( @@ -73,7 +77,7 @@ impl Device { pub fn default_input_device( name: &str, connect_ports_automatically: bool, - client: super::conn::PWClient, + client: Rc, ) -> Result { let input_client_name = format!("{}_in", name); Device::new_device( @@ -183,7 +187,7 @@ impl DeviceTrait for Device { return Err(BuildStreamError::StreamConfigNotSupported); } - let mut stream = Stream::new_input(self.client, conf.channels, data_callback, error_callback); + let mut stream = Stream::new_input(self.client.clone(), conf.channels, data_callback, error_callback); if self.connect_ports_automatically { stream.connect_to_system_inputs(); @@ -211,7 +215,7 @@ impl DeviceTrait for Device { return Err(BuildStreamError::StreamConfigNotSupported); } - let mut stream = Stream::new_output(self.client, conf.channels, data_callback, error_callback); + let mut stream = Stream::new_output(self.client.clone(), conf.channels, data_callback, error_callback); if self.connect_ports_automatically { stream.connect_to_system_outputs(); diff --git a/src/host/pipewire/mod.rs b/src/host/pipewire/mod.rs index 67613f5b1..07b970117 100644 --- a/src/host/pipewire/mod.rs +++ b/src/host/pipewire/mod.rs @@ -1,5 +1,6 @@ extern crate pipewire; +use std::rc::Rc; use std::sync::mpsc; use crate::{DevicesError, SampleFormat, SupportedStreamConfigRange}; @@ -29,12 +30,12 @@ pub struct Host { /// A list of the devices that have been created from this Host. devices_created: Vec, - client: conn::PWClient + client: Rc } impl Host { pub fn new() -> Result { - let client = conn::PWClient::new(); + let client = Rc::new(conn::PWClient::new()); let mut host = Host { name: "cpal_client".to_owned(), @@ -67,7 +68,7 @@ impl Host { let in_device_res = Device::default_input_device( &self.name, self.connect_ports_automatically, - self.client + self.client.clone() ); match in_device_res { @@ -80,7 +81,7 @@ impl Host { let out_device_res = Device::default_output_device( &self.name, self.connect_ports_automatically, - self.client + self.client.clone() ); match out_device_res { Ok(device) => self.devices_created.push(device), diff --git a/src/host/pipewire/stream.rs b/src/host/pipewire/stream.rs index 8876db8da..dc4280726 100644 --- a/src/host/pipewire/stream.rs +++ b/src/host/pipewire/stream.rs @@ -16,10 +16,10 @@ type ErrorCallbackPtr = Arc>; pub struct Stream { // TODO: It might be faster to send a message when playing/pausing than to check this every iteration playing: Arc, - async_client: jack::AsyncClient, + // async_client: jack::AsyncClient, // Port names are stored in order to connect them to other ports in jack automatically - input_port_names: Vec, - output_port_names: Vec, + // input_port_names: Vec, + // output_port_names: Vec, } impl Stream { @@ -34,58 +34,58 @@ impl Stream { D: FnMut(&Data, &InputCallbackInfo) + Send + 'static, E: FnMut(StreamError) + Send + 'static, { - let mut ports = vec![]; - let mut port_names: Vec = vec![]; + // let mut ports = vec![]; + // let mut port_names: Vec = vec![]; // Create ports - for i in 0..channels { - let port_try = client.register_port(&format!("in_{}", i), jack::AudioIn::default()); - match port_try { - Ok(port) => { - // Get the port name in order to later connect it automatically - if let Ok(port_name) = port.name() { - port_names.push(port_name); - } - // Store the port into a Vec to move to the ProcessHandler - ports.push(port); - } - Err(e) => { - // If port creation failed, send the error back via the error_callback - error_callback( - BackendSpecificError { - description: e.to_string(), - } - .into(), - ); - } - } - } + // for i in 0..channels { + // let port_try = client.register_port(&format!("in_{}", i), jack::AudioIn::default()); + // match port_try { + // Ok(port) => { + // // Get the port name in order to later connect it automatically + // if let Ok(port_name) = port.name() { + // port_names.push(port_name); + // } + // // Store the port into a Vec to move to the ProcessHandler + // ports.push(port); + // } + // Err(e) => { + // // If port creation failed, send the error back via the error_callback + // error_callback( + // BackendSpecificError { + // description: e.to_string(), + // } + // .into(), + // ); + // } + // } + // } let playing = Arc::new(AtomicBool::new(true)); let error_callback_ptr = Arc::new(Mutex::new(error_callback)) as ErrorCallbackPtr; - let input_process_handler = LocalProcessHandler::new( - vec![], - ports, - SampleRate(client.sample_rate() as u32), - client.buffer_size() as usize, - Some(Box::new(data_callback)), - None, - playing.clone(), - Arc::clone(&error_callback_ptr), - ); + // let input_process_handler = LocalProcessHandler::new( + // vec![], + // ports, + // SampleRate(client.sample_rate() as u32), + // client.buffer_size() as usize, + // Some(Box::new(data_callback)), + // None, + // playing.clone(), + // Arc::clone(&error_callback_ptr), + // ); let notification_handler = JackNotificationHandler::new(error_callback_ptr); - let async_client = client - .activate_async(notification_handler, input_process_handler) - .unwrap(); + // let async_client = client + // .activate_async(notification_handler, input_process_handler) + // .unwrap(); Stream { playing, - async_client, - input_port_names: port_names, - output_port_names: vec![], + // async_client, + // input_port_names: port_names, + // output_port_names: vec![], } } @@ -99,58 +99,58 @@ impl Stream { D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, E: FnMut(StreamError) + Send + 'static, { - let mut ports = vec![]; - let mut port_names: Vec = vec![]; - // Create ports - for i in 0..channels { - let port_try = client.register_port(&format!("out_{}", i), jack::AudioOut::default()); - match port_try { - Ok(port) => { - // Get the port name in order to later connect it automatically - if let Ok(port_name) = port.name() { - port_names.push(port_name); - } - // Store the port into a Vec to move to the ProcessHandler - ports.push(port); - } - Err(e) => { - // If port creation failed, send the error back via the error_callback - error_callback( - BackendSpecificError { - description: e.to_string(), - } - .into(), - ); - } - } - } + // let mut ports = vec![]; + // let mut port_names: Vec = vec![]; + // // Create ports + // for i in 0..channels { + // let port_try = client.register_port(&format!("out_{}", i), jack::AudioOut::default()); + // match port_try { + // Ok(port) => { + // // Get the port name in order to later connect it automatically + // if let Ok(port_name) = port.name() { + // port_names.push(port_name); + // } + // // Store the port into a Vec to move to the ProcessHandler + // ports.push(port); + // } + // Err(e) => { + // // If port creation failed, send the error back via the error_callback + // error_callback( + // BackendSpecificError { + // description: e.to_string(), + // } + // .into(), + // ); + // } + // } + // } let playing = Arc::new(AtomicBool::new(true)); let error_callback_ptr = Arc::new(Mutex::new(error_callback)) as ErrorCallbackPtr; - let output_process_handler = LocalProcessHandler::new( - ports, - vec![], - SampleRate(client.sample_rate() as u32), - client.buffer_size() as usize, - None, - Some(Box::new(data_callback)), - playing.clone(), - Arc::clone(&error_callback_ptr), - ); + // let output_process_handler = LocalProcessHandler::new( + // ports, + // vec![], + // SampleRate(client.sample_rate() as u32), + // client.buffer_size() as usize, + // None, + // Some(Box::new(data_callback)), + // playing.clone(), + // Arc::clone(&error_callback_ptr), + // ); - let notification_handler = JackNotificationHandler::new(error_callback_ptr); + // let notification_handler = JackNotificationHandler::new(error_callback_ptr); - let async_client = client - .activate_async(notification_handler, output_process_handler) - .unwrap(); + // let async_client = client + // .activate_async(notification_handler, output_process_handler) + // .unwrap(); Stream { playing, - async_client, - input_port_names: vec![], - output_port_names: port_names, + // async_client, + // input_port_names: vec![], + // output_port_names: port_names, } } @@ -158,52 +158,52 @@ impl Stream { /// This has to be done after the client is activated, doing it just after creating the ports doesn't work. pub fn connect_to_system_outputs(&mut self) { // Get the system ports - let system_ports = self.async_client.as_client().ports( - Some("system:playback_.*"), - None, - jack::PortFlags::empty(), - ); - - // Connect outputs from this client to the system playback inputs - for i in 0..self.output_port_names.len() { - if i >= system_ports.len() { - break; - } - match self - .async_client - .as_client() - .connect_ports_by_name(&self.output_port_names[i], &system_ports[i]) - { - Ok(_) => (), - Err(e) => println!("Unable to connect to port with error {}", e), - } - } + // let system_ports = self.async_client.as_client().ports( + // Some("system:playback_.*"), + // None, + // jack::PortFlags::empty(), + // ); + + // // Connect outputs from this client to the system playback inputs + // for i in 0..self.output_port_names.len() { + // if i >= system_ports.len() { + // break; + // } + // match self + // .async_client + // .as_client() + // .connect_ports_by_name(&self.output_port_names[i], &system_ports[i]) + // { + // Ok(_) => (), + // Err(e) => println!("Unable to connect to port with error {}", e), + // } + // } } /// Connect to the standard system outputs in jack, system:capture_1 and system:capture_2 /// This has to be done after the client is activated, doing it just after creating the ports doesn't work. pub fn connect_to_system_inputs(&mut self) { // Get the system ports - let system_ports = self.async_client.as_client().ports( - Some("system:capture_.*"), - None, - jack::PortFlags::empty(), - ); - - // Connect outputs from this client to the system playback inputs - for i in 0..self.input_port_names.len() { - if i >= system_ports.len() { - break; - } - match self - .async_client - .as_client() - .connect_ports_by_name(&system_ports[i], &self.input_port_names[i]) - { - Ok(_) => (), - Err(e) => println!("Unable to connect to port with error {}", e), - } - } + // let system_ports = self.async_client.as_client().ports( + // Some("system:capture_.*"), + // None, + // jack::PortFlags::empty(), + // ); + + // // Connect outputs from this client to the system playback inputs + // for i in 0..self.input_port_names.len() { + // if i >= system_ports.len() { + // break; + // } + // match self + // .async_client + // .as_client() + // .connect_ports_by_name(&system_ports[i], &self.input_port_names[i]) + // { + // Ok(_) => (), + // Err(e) => println!("Unable to connect to port with error {}", e), + // } + // } } } From e601955ecdace0ba06a7e97785240e71c626a737 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Maria=C5=84ski?= Date: Wed, 9 Mar 2022 22:41:54 +0100 Subject: [PATCH 04/10] cleanup, keep references to node/setting proxies and listeners, remove do_roundtrip --- src/host/pipewire/conn.rs | 146 +++++++++++++++++++----------------- src/host/pipewire/device.rs | 2 +- 2 files changed, 79 insertions(+), 69 deletions(-) diff --git a/src/host/pipewire/conn.rs b/src/host/pipewire/conn.rs index 1f3000d6c..75b218dee 100644 --- a/src/host/pipewire/conn.rs +++ b/src/host/pipewire/conn.rs @@ -1,9 +1,10 @@ extern crate pipewire; use self::pipewire::{ - metadata::Metadata, - node::Node, + metadata::{Metadata, MetadataListener}, + node::{Node, NodeListener}, prelude::*, + proxy::Listener, registry::{GlobalObject, Registry}, spa::{Direction, ForeignDict}, types::ObjectType, @@ -13,9 +14,11 @@ use self::pipewire::{ use std::{ borrow::BorrowMut, cell::{Cell, RefCell}, + collections::HashMap, rc::Rc, sync::mpsc, thread, + time::Duration, }; use super::device::DeviceType; @@ -27,6 +30,7 @@ enum Message { CreateDeviceNode { name: String, device_type: DeviceType, + autoconnect: bool, }, } @@ -72,11 +76,13 @@ impl PWClient { &self, name: String, device_type: DeviceType, + connect_ports_automatically: bool, ) -> Result { - match self - .pw_sender - .send(Message::CreateDeviceNode { name, device_type }) - { + match self.pw_sender.send(Message::CreateDeviceNode { + name, + device_type, + autoconnect: connect_ports_automatically, + }) { Ok(_) => match self.main_receiver.recv() { Ok(MessageRepl::NodeInfo(info)) => Ok(info), Err(err) => Err(format!("{:?}", err)), @@ -90,7 +96,7 @@ impl PWClient { #[derive(Default)] struct State { settings: Settings, - running: bool, + nodes: Vec, } #[derive(Default, Clone, Debug)] @@ -101,12 +107,25 @@ pub struct Settings { pub default_buffer_size: u32, } +enum ProxyItem { + Metadata { + _proxy: Metadata, + _listener: MetadataListener, + }, + Node { + _proxy: Node, + _listener: NodeListener, + }, +} + fn pw_thread( main_sender: mpsc::Sender, pw_receiver: pipewire::channel::Receiver, ) { + pipewire::init(); // let state = Rc::new(State::default()); let state = Rc::new(RefCell::new(State::default())); + let proxies = Rc::new(RefCell::new(HashMap::::new())); let mainloop = pipewire::MainLoop::new().expect("Failed to create PipeWire Mainloop"); @@ -130,79 +149,94 @@ fn pw_thread( let settings = state.borrow().settings.clone(); main_sender.send(MessageRepl::Settings(settings)); } - Message::CreateDeviceNode { name, device_type } => { - println!("Creating device"); + Message::CreateDeviceNode { + name, + device_type, + autoconnect, + } => { let node: Node = core .create_object( "adapter", //node_factory.get().expect("No node factory found"), &pipewire::properties! { *pipewire::keys::NODE_NAME => name.clone(), *pipewire::keys::FACTORY_NAME => "support.null-audio-sink", - // *pipewire::keys::MEDIA_CLASS => match device_type { - // DeviceType::InputDevice => "Audio/Sink", - // DeviceType::OutputDevice => "Audio/Source" - // }, - *pipewire::keys::MEDIA_CLASS => "Audio/Sink", + *pipewire::keys::MEDIA_TYPE => "Audio", + *pipewire::keys::MEDIA_CATEGORY => match device_type { + DeviceType::InputDevice => "Capture", + DeviceType::OutputDevice => "Playback" + }, + *pipewire::keys::NODE_AUTOCONNECT => match autoconnect { + false => "false", + true => "true", + }, // Don't remove the object on the remote when we destroy our proxy. // *pipewire::keys::OBJECT_LINGER => "1" }, ) .expect("Failed to create object"); - let _list = node.add_listener_local() + let _listener = node + .add_listener_local() .info(|f| { println!("{:?}", f); }) .param(|a, b, c, d| { - println!("{}, {}, {}, {}", a,b,c,d); + println!("{}, {}, {}, {}", a, b, c, d); }) .register(); - do_roundtrip(&mainloop, &core, &state); println!("{:?}", node); - main_sender.send(MessageRepl::NodeInfo(NodeInfo { name })); + state.as_ref().borrow_mut().nodes.push(node); + + // proxies.as_ref().borrow_mut().insert( + // node.proxy.id(), + // ProxyItem::Node { + // _proxy: node, + // _listener, + // }, + // ); - state.as_ref().borrow_mut().running = false; - mainloop.quit(); + main_sender.send(MessageRepl::NodeInfo(NodeInfo { name })); } } }); - let _listener = registry + let _reg_listener = registry .add_listener_local() .global({ let state = state.clone(); let registry = registry.clone(); - let mainloop = mainloop.clone(); - let core = core.clone(); + let proxies = proxies.clone(); move |global| match global.type_ { - ObjectType::Metadata => { - handle_metadata(global, state.clone(), ®istry, &mainloop, &core) - } + ObjectType::Metadata => handle_metadata(global, &state, ®istry, &proxies), _ => {} } }) .register(); - do_roundtrip(&mainloop, &core, &state); + // let timer = mainloop.add_timer({ + // move |_| { + // } + // }); - loop { - if state.borrow().running { - println!("LOOP START"); - mainloop.run(); - println!("LOOP END"); - } - } + // timer + // .update_timer( + // Some(Duration::from_millis(500)), + // Some(Duration::from_secs(1)), + // ) + // .into_result() + // .expect("FU"); + + mainloop.run(); } fn handle_metadata( metadata: &GlobalObject, - state: Rc>, + state: &Rc>, registry: &Rc, - mainloop: &MainLoop, - core: &Rc, + proxies: &Rc>>, ) { let props = metadata .props @@ -241,38 +275,14 @@ fn handle_metadata( }) .register(); - do_roundtrip(mainloop, core, &state); + proxies.as_ref().borrow_mut().insert( + metadata.id, + ProxyItem::Metadata { + _proxy: settings, + _listener, + }, + ); } _ => {} }; } - -fn do_roundtrip(mainloop: &pipewire::MainLoop, core: &pipewire::Core, state: &Rc>) { - let done = Rc::new(Cell::new(false)); - let done_clone = done.clone(); - let loop_clone = mainloop.clone(); - let state = state.clone(); - - state.as_ref().borrow_mut().running = false; - mainloop.quit(); - - // Trigger the sync event. The server's answer won't be processed until we start the main loop, - // so we can safely do this before setting up a callback. This lets us avoid using a Cell. - let pending = core.sync(0).expect("sync failed"); - - let _listener_core = core - .add_listener_local() - .done(move |id, seq| { - if id == pipewire::PW_ID_CORE && seq == pending { - done_clone.set(true); - loop_clone.quit(); - } - }) - .register(); - - while !done.get() { - mainloop.run(); - } - - state.as_ref().borrow_mut().running = true; -} diff --git a/src/host/pipewire/device.rs b/src/host/pipewire/device.rs index 6632395f0..9886e4c2b 100644 --- a/src/host/pipewire/device.rs +++ b/src/host/pipewire/device.rs @@ -45,7 +45,7 @@ impl Device { let settings = client.get_settings().unwrap(); - let info = client.create_device_node(name, device_type.clone()).expect("Error creating device"); + let info = client.create_device_node(name, device_type.clone(), connect_ports_automatically).expect("Error creating device"); Ok(Device { name: info.name, From 75db41d36678a947c74cc43e8f8f1c4e46e4134f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Maria=C5=84ski?= Date: Thu, 10 Mar 2022 00:50:24 +0100 Subject: [PATCH 05/10] make stream.rs compile without jack feature --- src/host/pipewire/stream.rs | 310 ++++++++++++++++++------------------ 1 file changed, 155 insertions(+), 155 deletions(-) diff --git a/src/host/pipewire/stream.rs b/src/host/pipewire/stream.rs index dc4280726..f0bc1a892 100644 --- a/src/host/pipewire/stream.rs +++ b/src/host/pipewire/stream.rs @@ -221,8 +221,8 @@ impl StreamTrait for Stream { struct LocalProcessHandler { /// No new ports are allowed to be created after the creation of the LocalProcessHandler as that would invalidate the buffer sizes - out_ports: Vec>, - in_ports: Vec>, + // out_ports: Vec>, + // in_ports: Vec>, sample_rate: SampleRate, buffer_size: usize, @@ -230,8 +230,8 @@ struct LocalProcessHandler { output_data_callback: Option>, // JACK audio samples are 32-bit float (unless you do some custom dark magic) - temp_input_buffer: Vec, - temp_output_buffer: Vec, + // temp_input_buffer: Vec, + // temp_output_buffer: Vec, playing: Arc, creation_timestamp: std::time::Instant, /// This should not be called on `process`, only on `buffer_size` because it can block. @@ -240,8 +240,8 @@ struct LocalProcessHandler { impl LocalProcessHandler { fn new( - out_ports: Vec>, - in_ports: Vec>, + // out_ports: Vec>, + // in_ports: Vec>, sample_rate: SampleRate, buffer_size: usize, input_data_callback: Option>, @@ -252,18 +252,18 @@ impl LocalProcessHandler { error_callback_ptr: ErrorCallbackPtr, ) -> Self { // These may be reallocated in the `buffer_size` callback. - let temp_input_buffer = vec![0.0; in_ports.len() * buffer_size]; - let temp_output_buffer = vec![0.0; out_ports.len() * buffer_size]; + // let temp_input_buffer = vec![0.0; in_ports.len() * buffer_size]; + // let temp_output_buffer = vec![0.0; out_ports.len() * buffer_size]; LocalProcessHandler { - out_ports, - in_ports, + // out_ports, + // in_ports, sample_rate, buffer_size, input_data_callback, output_data_callback, - temp_input_buffer, - temp_output_buffer, + // temp_input_buffer, + // temp_output_buffer, playing, creation_timestamp: std::time::Instant::now(), error_callback_ptr, @@ -279,123 +279,123 @@ fn temp_buffer_to_data(temp_input_buffer: &mut Vec, total_buffer_size: usiz data } -impl jack::ProcessHandler for LocalProcessHandler { - fn process(&mut self, _: &jack::Client, process_scope: &jack::ProcessScope) -> jack::Control { - if !self.playing.load(Ordering::SeqCst) { - return jack::Control::Continue; - } - - // This should be equal to self.buffer_size, but the implementation will - // work even if it is less. Will panic in `temp_buffer_to_data` if greater. - let current_frame_count = process_scope.n_frames() as usize; - - // Get timestamp data - let cycle_times = process_scope.cycle_times(); - let current_start_usecs = match cycle_times { - Ok(times) => times.current_usecs, - Err(_) => { - // jack was unable to get the current time information - // Fall back to using Instants - let now = std::time::Instant::now(); - let duration = now.duration_since(self.creation_timestamp); - duration.as_micros() as u64 - } - }; - let start_cycle_instant = micros_to_stream_instant(current_start_usecs); - let start_callback_instant = start_cycle_instant - .add(frames_to_duration( - process_scope.frames_since_cycle_start() as usize, - self.sample_rate, - )) - .expect("`playback` occurs beyond representation supported by `StreamInstant`"); - - if let Some(input_callback) = &mut self.input_data_callback { - // Let's get the data from the input ports and run the callback - - let num_in_channels = self.in_ports.len(); - - // Read the data from the input ports into the temporary buffer - // Go through every channel and store its data in the temporary input buffer - for ch_ix in 0..num_in_channels { - let input_channel = &self.in_ports[ch_ix].as_slice(process_scope); - for i in 0..current_frame_count { - self.temp_input_buffer[ch_ix + i * num_in_channels] = input_channel[i]; - } - } - // Create a slice of exactly current_frame_count frames - let data = temp_buffer_to_data( - &mut self.temp_input_buffer, - current_frame_count * num_in_channels, - ); - // Create timestamp - let frames_since_cycle_start = process_scope.frames_since_cycle_start() as usize; - let duration_since_cycle_start = - frames_to_duration(frames_since_cycle_start, self.sample_rate); - let callback = start_callback_instant - .add(duration_since_cycle_start) - .expect("`playback` occurs beyond representation supported by `StreamInstant`"); - let capture = start_callback_instant; - let timestamp = crate::InputStreamTimestamp { callback, capture }; - let info = crate::InputCallbackInfo { timestamp }; - input_callback(&data, &info); - } - - if let Some(output_callback) = &mut self.output_data_callback { - let num_out_channels = self.out_ports.len(); - - // Create a slice of exactly current_frame_count frames - let mut data = temp_buffer_to_data( - &mut self.temp_output_buffer, - current_frame_count * num_out_channels, - ); - // Create timestamp - let frames_since_cycle_start = process_scope.frames_since_cycle_start() as usize; - let duration_since_cycle_start = - frames_to_duration(frames_since_cycle_start, self.sample_rate); - let callback = start_callback_instant - .add(duration_since_cycle_start) - .expect("`playback` occurs beyond representation supported by `StreamInstant`"); - let buffer_duration = frames_to_duration(current_frame_count, self.sample_rate); - let playback = start_cycle_instant - .add(buffer_duration) - .expect("`playback` occurs beyond representation supported by `StreamInstant`"); - let timestamp = crate::OutputStreamTimestamp { callback, playback }; - let info = crate::OutputCallbackInfo { timestamp }; - output_callback(&mut data, &info); - - // Deinterlace - for ch_ix in 0..num_out_channels { - let output_channel = &mut self.out_ports[ch_ix].as_mut_slice(process_scope); - for i in 0..current_frame_count { - output_channel[i] = self.temp_output_buffer[ch_ix + i * num_out_channels]; - } - } - } - - // Continue as normal - jack::Control::Continue - } - - fn buffer_size(&mut self, _: &jack::Client, size: jack::Frames) -> jack::Control { - // The `buffer_size` callback is actually called on the process thread, but - // it does not need to be suitable for real-time use. Thus we can simply allocate - // new buffers here. It is also fine to call the error callback. - // Details: https://github.com/RustAudio/rust-jack/issues/137 - let new_size = size as usize; - if new_size != self.buffer_size { - self.buffer_size = new_size; - self.temp_input_buffer = vec![0.0; self.in_ports.len() * new_size]; - self.temp_output_buffer = vec![0.0; self.out_ports.len() * new_size]; - let description = format!("buffer size changed to: {}", new_size); - if let Ok(mut mutex_guard) = self.error_callback_ptr.lock() { - let err = &mut *mutex_guard; - err(BackendSpecificError { description }.into()); - } - } - - jack::Control::Continue - } -} +// impl jack::ProcessHandler for LocalProcessHandler { +// fn process(&mut self, _: &jack::Client, process_scope: &jack::ProcessScope) -> jack::Control { +// if !self.playing.load(Ordering::SeqCst) { +// return jack::Control::Continue; +// } + +// // This should be equal to self.buffer_size, but the implementation will +// // work even if it is less. Will panic in `temp_buffer_to_data` if greater. +// let current_frame_count = process_scope.n_frames() as usize; + +// // Get timestamp data +// let cycle_times = process_scope.cycle_times(); +// let current_start_usecs = match cycle_times { +// Ok(times) => times.current_usecs, +// Err(_) => { +// // jack was unable to get the current time information +// // Fall back to using Instants +// let now = std::time::Instant::now(); +// let duration = now.duration_since(self.creation_timestamp); +// duration.as_micros() as u64 +// } +// }; +// let start_cycle_instant = micros_to_stream_instant(current_start_usecs); +// let start_callback_instant = start_cycle_instant +// .add(frames_to_duration( +// process_scope.frames_since_cycle_start() as usize, +// self.sample_rate, +// )) +// .expect("`playback` occurs beyond representation supported by `StreamInstant`"); + +// if let Some(input_callback) = &mut self.input_data_callback { +// // Let's get the data from the input ports and run the callback + +// let num_in_channels = self.in_ports.len(); + +// // Read the data from the input ports into the temporary buffer +// // Go through every channel and store its data in the temporary input buffer +// for ch_ix in 0..num_in_channels { +// let input_channel = &self.in_ports[ch_ix].as_slice(process_scope); +// for i in 0..current_frame_count { +// self.temp_input_buffer[ch_ix + i * num_in_channels] = input_channel[i]; +// } +// } +// // Create a slice of exactly current_frame_count frames +// let data = temp_buffer_to_data( +// &mut self.temp_input_buffer, +// current_frame_count * num_in_channels, +// ); +// // Create timestamp +// let frames_since_cycle_start = process_scope.frames_since_cycle_start() as usize; +// let duration_since_cycle_start = +// frames_to_duration(frames_since_cycle_start, self.sample_rate); +// let callback = start_callback_instant +// .add(duration_since_cycle_start) +// .expect("`playback` occurs beyond representation supported by `StreamInstant`"); +// let capture = start_callback_instant; +// let timestamp = crate::InputStreamTimestamp { callback, capture }; +// let info = crate::InputCallbackInfo { timestamp }; +// input_callback(&data, &info); +// } + +// if let Some(output_callback) = &mut self.output_data_callback { +// let num_out_channels = self.out_ports.len(); + +// // Create a slice of exactly current_frame_count frames +// let mut data = temp_buffer_to_data( +// &mut self.temp_output_buffer, +// current_frame_count * num_out_channels, +// ); +// // Create timestamp +// let frames_since_cycle_start = process_scope.frames_since_cycle_start() as usize; +// let duration_since_cycle_start = +// frames_to_duration(frames_since_cycle_start, self.sample_rate); +// let callback = start_callback_instant +// .add(duration_since_cycle_start) +// .expect("`playback` occurs beyond representation supported by `StreamInstant`"); +// let buffer_duration = frames_to_duration(current_frame_count, self.sample_rate); +// let playback = start_cycle_instant +// .add(buffer_duration) +// .expect("`playback` occurs beyond representation supported by `StreamInstant`"); +// let timestamp = crate::OutputStreamTimestamp { callback, playback }; +// let info = crate::OutputCallbackInfo { timestamp }; +// output_callback(&mut data, &info); + +// // Deinterlace +// for ch_ix in 0..num_out_channels { +// let output_channel = &mut self.out_ports[ch_ix].as_mut_slice(process_scope); +// for i in 0..current_frame_count { +// output_channel[i] = self.temp_output_buffer[ch_ix + i * num_out_channels]; +// } +// } +// } + +// // Continue as normal +// jack::Control::Continue +// } + +// fn buffer_size(&mut self, _: &jack::Client, size: jack::Frames) -> jack::Control { +// // The `buffer_size` callback is actually called on the process thread, but +// // it does not need to be suitable for real-time use. Thus we can simply allocate +// // new buffers here. It is also fine to call the error callback. +// // Details: https://github.com/RustAudio/rust-jack/issues/137 +// let new_size = size as usize; +// if new_size != self.buffer_size { +// self.buffer_size = new_size; +// self.temp_input_buffer = vec![0.0; self.in_ports.len() * new_size]; +// self.temp_output_buffer = vec![0.0; self.out_ports.len() * new_size]; +// let description = format!("buffer size changed to: {}", new_size); +// if let Ok(mut mutex_guard) = self.error_callback_ptr.lock() { +// let err = &mut *mutex_guard; +// err(BackendSpecificError { description }.into()); +// } +// } + +// jack::Control::Continue +// } +// } fn micros_to_stream_instant(micros: u64) -> crate::StreamInstant { let nanos = micros * 1000; @@ -436,29 +436,29 @@ impl JackNotificationHandler { } } -impl jack::NotificationHandler for JackNotificationHandler { - fn shutdown(&mut self, _status: jack::ClientStatus, reason: &str) { - self.send_error(format!("JACK was shut down for reason: {}", reason)); - } - - fn sample_rate(&mut self, _: &jack::Client, srate: jack::Frames) -> jack::Control { - match self.init_sample_rate_flag.load(Ordering::SeqCst) { - false => { - // One of these notifications is sent every time a client is started. - self.init_sample_rate_flag.store(true, Ordering::SeqCst); - jack::Control::Continue - } - true => { - self.send_error(format!("sample rate changed to: {}", srate)); - // Since CPAL currently has no way of signaling a sample rate change in order to make - // all necessary changes that would bring we choose to quit. - jack::Control::Quit - } - } - } - - fn xrun(&mut self, _: &jack::Client) -> jack::Control { - self.send_error(String::from("xrun (buffer over or under run)")); - jack::Control::Continue - } -} +// impl jack::NotificationHandler for JackNotificationHandler { +// fn shutdown(&mut self, _status: jack::ClientStatus, reason: &str) { +// self.send_error(format!("JACK was shut down for reason: {}", reason)); +// } + +// fn sample_rate(&mut self, _: &jack::Client, srate: jack::Frames) -> jack::Control { +// match self.init_sample_rate_flag.load(Ordering::SeqCst) { +// false => { +// // One of these notifications is sent every time a client is started. +// self.init_sample_rate_flag.store(true, Ordering::SeqCst); +// jack::Control::Continue +// } +// true => { +// self.send_error(format!("sample rate changed to: {}", srate)); +// // Since CPAL currently has no way of signaling a sample rate change in order to make +// // all necessary changes that would bring we choose to quit. +// jack::Control::Quit +// } +// } +// } + +// fn xrun(&mut self, _: &jack::Client) -> jack::Control { +// self.send_error(String::from("xrun (buffer over or under run)")); +// jack::Control::Continue +// } +// } From 53ec73b2a4e4d42a5267497560530c3825184ff0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Maria=C5=84ski?= Date: Thu, 10 Mar 2022 00:51:44 +0100 Subject: [PATCH 06/10] keep references to node listeners --- src/host/pipewire/conn.rs | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/src/host/pipewire/conn.rs b/src/host/pipewire/conn.rs index 75b218dee..2586dec2a 100644 --- a/src/host/pipewire/conn.rs +++ b/src/host/pipewire/conn.rs @@ -96,7 +96,7 @@ impl PWClient { #[derive(Default)] struct State { settings: Settings, - nodes: Vec, + nodes: Vec, } #[derive(Default, Clone, Debug)] @@ -125,7 +125,7 @@ fn pw_thread( pipewire::init(); // let state = Rc::new(State::default()); let state = Rc::new(RefCell::new(State::default())); - let proxies = Rc::new(RefCell::new(HashMap::::new())); + let proxies = Rc::new(RefCell::new(HashMap::new())); let mainloop = pipewire::MainLoop::new().expect("Failed to create PipeWire Mainloop"); @@ -142,6 +142,7 @@ fn pw_thread( let state = state.clone(); let main_sender = main_sender.clone(); let core = core.clone(); + let proxies = proxies.clone(); move |msg| match msg { Message::Terminate => mainloop.quit(), @@ -177,8 +178,8 @@ fn pw_thread( let _listener = node .add_listener_local() - .info(|f| { - println!("{:?}", f); + .info(|info| { + // println!("{:?}", info); }) .param(|a, b, c, d| { println!("{}, {}, {}, {}", a, b, c, d); @@ -187,15 +188,10 @@ fn pw_thread( println!("{:?}", node); - state.as_ref().borrow_mut().nodes.push(node); - - // proxies.as_ref().borrow_mut().insert( - // node.proxy.id(), - // ProxyItem::Node { - // _proxy: node, - // _listener, - // }, - // ); + state.as_ref().borrow_mut().nodes.push(ProxyItem::Node { + _proxy: node, + _listener, + }); main_sender.send(MessageRepl::NodeInfo(NodeInfo { name })); } From 352d762898bf6ff6c15b23b18b15981f5dc7e2e3 Mon Sep 17 00:00:00 2001 From: ImUrX Date: Sat, 17 Sep 2022 20:03:53 -0300 Subject: [PATCH 07/10] enable pipewire by default --- src/host/pipewire/device.rs | 2 +- src/host/pipewire/mod.rs | 2 +- src/host/pipewire/stream.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/host/pipewire/device.rs b/src/host/pipewire/device.rs index 9886e4c2b..31daf0c8a 100644 --- a/src/host/pipewire/device.rs +++ b/src/host/pipewire/device.rs @@ -6,7 +6,7 @@ use crate::{ }; use std::hash::{Hash, Hasher}; use std::rc::Rc; -use traits::DeviceTrait; +use crate::traits::DeviceTrait; use super::stream::Stream; use super::PIPEWIRE_SAMPLE_FORMAT; diff --git a/src/host/pipewire/mod.rs b/src/host/pipewire/mod.rs index 07b970117..c883e32a3 100644 --- a/src/host/pipewire/mod.rs +++ b/src/host/pipewire/mod.rs @@ -4,7 +4,7 @@ use std::rc::Rc; use std::sync::mpsc; use crate::{DevicesError, SampleFormat, SupportedStreamConfigRange}; -use traits::HostTrait; +use crate::traits::HostTrait; mod device; pub use self::device::Device; diff --git a/src/host/pipewire/stream.rs b/src/host/pipewire/stream.rs index f0bc1a892..b89434f3f 100644 --- a/src/host/pipewire/stream.rs +++ b/src/host/pipewire/stream.rs @@ -2,7 +2,7 @@ use crate::ChannelCount; use std::rc::Rc; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; -use traits::StreamTrait; +use crate::traits::StreamTrait; use crate::{ BackendSpecificError, Data, InputCallbackInfo, OutputCallbackInfo, PauseStreamError, From 483e3440540be20cf3570c3b984ef69f75a15d1b Mon Sep 17 00:00:00 2001 From: ImUrX Date: Wed, 21 Sep 2022 15:03:00 -0300 Subject: [PATCH 08/10] add enumeration --- Cargo.toml | 1 + src/host/mod.rs | 6 +-- src/host/pipewire/conn.rs | 89 ++++++++++++++++++++++++++++++++++--- src/host/pipewire/device.rs | 74 +++++++++++++++++++++--------- src/host/pipewire/mod.rs | 14 +++--- src/host/pipewire/stream.rs | 3 +- src/platform/mod.rs | 4 +- 7 files changed, 148 insertions(+), 43 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 118f9254e..e663a09d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,7 @@ alsa = "0.6" libc = "0.2" parking_lot = "0.12" jack = { version = "0.10", optional = true } +intmap = "2.0" pipewire = { git = "https://gitlab.freedesktop.org/pipewire/pipewire-rs", optional = true } [target.'cfg(any(target_os = "macos", target_os = "ios"))'.dependencies] diff --git a/src/host/mod.rs b/src/host/mod.rs index f89e2c1b2..80d735083 100644 --- a/src/host/mod.rs +++ b/src/host/mod.rs @@ -21,14 +21,14 @@ pub(crate) mod emscripten; feature = "jack" ))] pub(crate) mod jack; +pub(crate) mod null; +#[cfg(target_os = "android")] +pub(crate) mod oboe; #[cfg(all( any(target_os = "linux", target_os = "dragonfly", target_os = "freebsd"), feature = "pipewire" ))] pub(crate) mod pipewire; -pub(crate) mod null; -#[cfg(target_os = "android")] -pub(crate) mod oboe; #[cfg(windows)] pub(crate) mod wasapi; #[cfg(all(target_arch = "wasm32", feature = "wasm-bindgen"))] diff --git a/src/host/pipewire/conn.rs b/src/host/pipewire/conn.rs index 2586dec2a..685655939 100644 --- a/src/host/pipewire/conn.rs +++ b/src/host/pipewire/conn.rs @@ -1,5 +1,9 @@ extern crate pipewire; +use intmap::IntMap; + +use super::Device; + use self::pipewire::{ metadata::{Metadata, MetadataListener}, node::{Node, NodeListener}, @@ -32,6 +36,7 @@ enum Message { device_type: DeviceType, autoconnect: bool, }, + EnumerateDevices, } enum MessageRepl { @@ -41,6 +46,7 @@ enum MessageRepl { pub struct NodeInfo { pub name: String, + pub id: u32, } pub struct PWClient { @@ -97,11 +103,12 @@ impl PWClient { struct State { settings: Settings, nodes: Vec, + devices: IntMap, } #[derive(Default, Clone, Debug)] pub struct Settings { - pub sample_rate: u32, + pub allowed_sample_rates: Vec, pub min_buffer_size: u32, pub max_buffer_size: u32, pub default_buffer_size: u32, @@ -148,7 +155,9 @@ fn pw_thread( Message::Terminate => mainloop.quit(), Message::GetSettings => { let settings = state.borrow().settings.clone(); - main_sender.send(MessageRepl::Settings(settings)); + main_sender + .send(MessageRepl::Settings(settings)) + .expect("Failed to send settings"); } Message::CreateDeviceNode { name, @@ -176,25 +185,32 @@ fn pw_thread( ) .expect("Failed to create object"); + let id = Rc::new(Cell::new(0)); + let id_clone = id.clone(); let _listener = node .add_listener_local() - .info(|info| { - // println!("{:?}", info); + .info(move |info| { + id_clone.set(info.id()); }) .param(|a, b, c, d| { println!("{}, {}, {}, {}", a, b, c, d); }) .register(); - println!("{:?}", node); + while id.get() == 0 { + mainloop.run(); + } state.as_ref().borrow_mut().nodes.push(ProxyItem::Node { _proxy: node, _listener, }); - main_sender.send(MessageRepl::NodeInfo(NodeInfo { name })); + main_sender + .send(MessageRepl::NodeInfo(NodeInfo { name, id: id.get() })) + .expect("Failed to send node info"); } + Message::EnumerateDevices => {} } }); @@ -207,6 +223,47 @@ fn pw_thread( move |global| match global.type_ { ObjectType::Metadata => handle_metadata(global, &state, ®istry, &proxies), + ObjectType::Node => { + if let Some(ref props) = global.props { + let mut state = state.as_ref().borrow_mut(); + let name = props + .get("node.nick") + .or(props.get("node.description")) + .unwrap_or("Unknown device"); + match props.get("media.class") { + Some("Audio/Source") => { + state.devices.insert( + global.id.into(), + NodeInfo { + name: name.to_string(), + id: global.id, + }, + ); + } + Some("Audio/Sink") => { + state.devices.insert( + global.id.into(), + NodeInfo { + name: name.to_string(), + id: global.id, + }, + ); + } + _ => {} + } + if props.get("media.class") == Some("Audio/Source") + && global.type_ == ObjectType::Node + { + println!( + "object: id:{} type:{}/{} nick:{}", + global.id, + global.type_, + global.version, + props.get("node.nick").unwrap_or("failed to get name") + ); + } + } + } _ => {} } }) @@ -248,11 +305,12 @@ fn handle_metadata( .property({ let state = state.clone(); move |_, key, _, value| { + let mut sample_rate = 0; let mut state = state.as_ref().borrow_mut(); if let Some(value) = value { if let Ok(value) = value.parse::() { match key { - Some("clock.rate") => state.settings.sample_rate = value, + Some("clock.rate") => sample_rate = value, Some("clock.quantum") => { state.settings.default_buffer_size = value } @@ -264,8 +322,25 @@ fn handle_metadata( } _ => {} }; + } else { + match key { + Some("clock.allowed-rates") => { + let rates: Result, _> = value[2..value.len() - 2] + .split_whitespace() + .map(|x| x.parse::()) + .collect(); + state.settings.allowed_sample_rates = + rates.expect("Couldn't parse allowed rates"); + } + _ => {} + } } } + // Not sure if allowed-rates can be empty, + // but if it is just push the currently used one. + if state.settings.allowed_sample_rates.is_empty() { + state.settings.allowed_sample_rates.push(sample_rate); + } 0 } }) diff --git a/src/host/pipewire/device.rs b/src/host/pipewire/device.rs index 31daf0c8a..1abd1a526 100644 --- a/src/host/pipewire/device.rs +++ b/src/host/pipewire/device.rs @@ -1,3 +1,4 @@ +use crate::traits::DeviceTrait; use crate::{ BackendSpecificError, BuildStreamError, Data, DefaultStreamConfigError, DeviceNameError, InputCallbackInfo, OutputCallbackInfo, SampleFormat, SampleRate, StreamConfig, StreamError, @@ -6,7 +7,6 @@ use crate::{ }; use std::hash::{Hash, Hasher}; use std::rc::Rc; -use crate::traits::DeviceTrait; use super::stream::Stream; use super::PIPEWIRE_SAMPLE_FORMAT; @@ -26,12 +26,10 @@ pub enum DeviceType { } #[derive(Clone)] pub struct Device { - name: String, - sample_rate: SampleRate, - buffer_size: SupportedBufferSize, - device_type: DeviceType, - connect_ports_automatically: bool, - client: Rc + pub(crate) name: String, + pub(crate) device_type: DeviceType, + pub(crate) connect_ports_automatically: bool, + pub(crate) client: Rc, } impl Device { @@ -41,22 +39,29 @@ impl Device { device_type: DeviceType, client: Rc, ) -> Result { - while client.get_settings().and_then(|s| if s.sample_rate == 0 {Err(String::new())} else {Ok(true)} ).is_err() {} + while client + .get_settings() + .and_then(|s| { + if s.allowed_sample_rates.is_empty() { + Err(String::new()) + } else { + Ok(true) + } + }) + .is_err() + {} let settings = client.get_settings().unwrap(); - let info = client.create_device_node(name, device_type.clone(), connect_ports_automatically).expect("Error creating device"); + let info = client + .create_device_node(name, device_type.clone(), connect_ports_automatically) + .expect("Error creating device"); Ok(Device { name: info.name, - sample_rate: SampleRate(settings.sample_rate), - buffer_size: SupportedBufferSize::Range { - min: settings.min_buffer_size, - max: settings.max_buffer_size, - }, device_type, connect_ports_automatically, - client + client, }) } @@ -89,9 +94,14 @@ impl Device { } pub fn default_config(&self) -> Result { + let settings = self.client.get_settings().unwrap(); let channels = DEFAULT_NUM_CHANNELS; - let sample_rate = self.sample_rate; - let buffer_size = self.buffer_size.clone(); + // Default is highest sample rate possible + let sample_rate = SampleRate(*settings.allowed_sample_rates.last().unwrap()); + let buffer_size = SupportedBufferSize::Range { + min: settings.min_buffer_size, + max: settings.max_buffer_size, + }; // The sample format for JACK audio ports is always "32-bit float mono audio" in the current implementation. // Custom formats are allowed within JACK, but this is of niche interest. // The format can be found programmatically by calling jack::PortSpec::port_type() on a created port. @@ -105,6 +115,7 @@ impl Device { } pub fn supported_configs(&self) -> Vec { + let settings = self.client.get_settings().unwrap(); let f = match self.default_config() { Err(_) => return vec![], Ok(f) => f, @@ -115,7 +126,8 @@ impl Device { for &channels in DEFAULT_SUPPORTED_CHANNELS.iter() { supported_configs.push(SupportedStreamConfigRange { channels, - min_sample_rate: f.sample_rate, + min_sample_rate: SampleRate(*settings.allowed_sample_rates.first().unwrap()), + // Default is maximum possible, so just use that max_sample_rate: f.sample_rate, buffer_size: f.buffer_size.clone(), sample_format: f.sample_format, @@ -179,15 +191,25 @@ impl DeviceTrait for Device { D: FnMut(&Data, &InputCallbackInfo) + Send + 'static, E: FnMut(StreamError) + Send + 'static, { + let settings = self.client.get_settings().unwrap(); if let DeviceType::OutputDevice = &self.device_type { // Trying to create an input stream from an output device return Err(BuildStreamError::StreamConfigNotSupported); } - if conf.sample_rate != self.sample_rate || sample_format != PIPEWIRE_SAMPLE_FORMAT { + // FIXME: Not sure if we should go to the nearest neighbour sample rate + // This issue also happens on build_output_stream_raw() + if settings.allowed_sample_rates.contains(&conf.sample_rate.0) + || sample_format != PIPEWIRE_SAMPLE_FORMAT + { return Err(BuildStreamError::StreamConfigNotSupported); } - let mut stream = Stream::new_input(self.client.clone(), conf.channels, data_callback, error_callback); + let mut stream = Stream::new_input( + self.client.clone(), + conf.channels, + data_callback, + error_callback, + ); if self.connect_ports_automatically { stream.connect_to_system_inputs(); @@ -207,15 +229,23 @@ impl DeviceTrait for Device { D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, E: FnMut(StreamError) + Send + 'static, { + let settings = self.client.get_settings().unwrap(); if let DeviceType::InputDevice = &self.device_type { // Trying to create an output stream from an input device return Err(BuildStreamError::StreamConfigNotSupported); } - if conf.sample_rate != self.sample_rate || sample_format != PIPEWIRE_SAMPLE_FORMAT { + if settings.allowed_sample_rates.contains(&conf.sample_rate.0) + || sample_format != PIPEWIRE_SAMPLE_FORMAT + { return Err(BuildStreamError::StreamConfigNotSupported); } - let mut stream = Stream::new_output(self.client.clone(), conf.channels, data_callback, error_callback); + let mut stream = Stream::new_output( + self.client.clone(), + conf.channels, + data_callback, + error_callback, + ); if self.connect_ports_automatically { stream.connect_to_system_outputs(); diff --git a/src/host/pipewire/mod.rs b/src/host/pipewire/mod.rs index c883e32a3..734030cbc 100644 --- a/src/host/pipewire/mod.rs +++ b/src/host/pipewire/mod.rs @@ -3,14 +3,14 @@ extern crate pipewire; use std::rc::Rc; use std::sync::mpsc; -use crate::{DevicesError, SampleFormat, SupportedStreamConfigRange}; use crate::traits::HostTrait; +use crate::{DevicesError, SampleFormat, SupportedStreamConfigRange}; mod device; pub use self::device::Device; pub use self::stream::Stream; -mod stream; mod conn; +mod stream; const PIPEWIRE_SAMPLE_FORMAT: SampleFormat = SampleFormat::F32; @@ -30,7 +30,7 @@ pub struct Host { /// A list of the devices that have been created from this Host. devices_created: Vec, - client: Rc + client: Rc, } impl Host { @@ -41,7 +41,7 @@ impl Host { name: "cpal_client".to_owned(), connect_ports_automatically: true, devices_created: vec![], - client + client, }; // Devices don't exist for PipeWire, they have to be created @@ -68,7 +68,7 @@ impl Host { let in_device_res = Device::default_input_device( &self.name, self.connect_ports_automatically, - self.client.clone() + self.client.clone(), ); match in_device_res { @@ -81,7 +81,7 @@ impl Host { let out_device_res = Device::default_output_device( &self.name, self.connect_ports_automatically, - self.client.clone() + self.client.clone(), ); match out_device_res { Ok(device) => self.devices_created.push(device), @@ -121,4 +121,4 @@ impl HostTrait for Host { } None } -} \ No newline at end of file +} diff --git a/src/host/pipewire/stream.rs b/src/host/pipewire/stream.rs index b89434f3f..7fd4d4d04 100644 --- a/src/host/pipewire/stream.rs +++ b/src/host/pipewire/stream.rs @@ -1,8 +1,8 @@ +use crate::traits::StreamTrait; use crate::ChannelCount; use std::rc::Rc; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; -use crate::traits::StreamTrait; use crate::{ BackendSpecificError, Data, InputCallbackInfo, OutputCallbackInfo, PauseStreamError, @@ -223,7 +223,6 @@ struct LocalProcessHandler { /// No new ports are allowed to be created after the creation of the LocalProcessHandler as that would invalidate the buffer sizes // out_ports: Vec>, // in_ports: Vec>, - sample_rate: SampleRate, buffer_size: usize, input_data_callback: Option>, diff --git a/src/platform/mod.rs b/src/platform/mod.rs index 2e4e75ea2..ec35c1ea9 100644 --- a/src/platform/mod.rs +++ b/src/platform/mod.rs @@ -588,8 +588,8 @@ mod platform_impl { }; #[cfg(feature = "pipewire")] pub use crate::host::pipewire::{ - Device as PipeWireDevice, Devices as PipeWireDevices, Host as PipeWireHost, Stream as PipeWireStream, - SupportedInputConfigs as PipeWireSupportedInputConfigs, + Device as PipeWireDevice, Devices as PipeWireDevices, Host as PipeWireHost, + Stream as PipeWireStream, SupportedInputConfigs as PipeWireSupportedInputConfigs, SupportedOutputConfigs as PipeWireupportedOutputConfigs, }; From 739b6231133349a420d0c851bf5a60f5acad6c45 Mon Sep 17 00:00:00 2001 From: mbodmer Date: Sat, 27 Aug 2022 13:52:12 +0200 Subject: [PATCH 09/10] pipewire: update beep example to support pipewire Signed-off-by: mbodmer --- examples/beep.rs | 41 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 38 insertions(+), 3 deletions(-) diff --git a/examples/beep.rs b/examples/beep.rs index e0266aa79..e068fd9c8 100644 --- a/examples/beep.rs +++ b/examples/beep.rs @@ -25,12 +25,26 @@ struct Opt { #[arg(short, long)] #[allow(dead_code)] jack: bool, + /// Use the Pipewire host + #[cfg(all( + any( + target_os = "linux", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "netbsd" + ), + feature = "pipewire" + ))] + #[allow(dead_code)] + pipewire: bool, } fn main() -> anyhow::Result<()> { let opt = Opt::parse(); // Conditionally compile with jack if the feature is specified. + // Manually check for flags. Can be passed through cargo with -- e.g. + // cargo run --release --example beep --features jack -- --jack #[cfg(all( any( target_os = "linux", @@ -40,8 +54,6 @@ fn main() -> anyhow::Result<()> { ), feature = "jack" ))] - // Manually check for flags. Can be passed through cargo with -- e.g. - // cargo run --release --example beep --features jack -- --jack let host = if opt.jack { cpal::host_from_id(cpal::available_hosts() .into_iter() @@ -53,6 +65,29 @@ fn main() -> anyhow::Result<()> { cpal::default_host() }; + // Conditionally compile with PipeWire if the feature is specified. + #[cfg(all( + any( + target_os = "linux", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "netbsd" + ), + feature = "pipewire" + ))] + // Manually check for flags. Can be passed through cargo with -- e.g. + // cargo run --release --example beep --features pipewire -- --pipewire + let host = if opt.pipewire { + cpal::host_from_id(cpal::available_hosts() + .into_iter() + .find(|id| *id == cpal::HostId::PipeWire) + .expect( + "make sure --features pipewire is specified. only works on OSes where PipeWire is available", + )).expect("PipeWire host unavailable") + } else { + cpal::default_host() + }; + #[cfg(any( not(any( target_os = "linux", @@ -60,7 +95,7 @@ fn main() -> anyhow::Result<()> { target_os = "freebsd", target_os = "netbsd" )), - not(feature = "jack") + not(any(feature = "jack", feature = "pipewire")) ))] let host = cpal::default_host(); From 0a3341347d076e6c5f7478872222ccd7710d7260 Mon Sep 17 00:00:00 2001 From: ImUrX Date: Sat, 28 Jan 2023 18:59:42 -0300 Subject: [PATCH 10/10] newline --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index e663a09d1..1dc6c0c2b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -86,4 +86,4 @@ name = "feedback" name = "record_wav" [[example]] -name = "synth_tones" \ No newline at end of file +name = "synth_tones"