diff --git a/Cargo.toml b/Cargo.toml index df4079ca7..1dc6c0c2b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,8 @@ alsa = "0.6" libc = "0.2" parking_lot = "0.12" jack = { version = "0.10", optional = true } +intmap = "2.0" +pipewire = { git = "https://gitlab.freedesktop.org/pipewire/pipewire-rs", optional = true } [target.'cfg(any(target_os = "macos", target_os = "ios"))'.dependencies] core-foundation-sys = "0.8.2" # For linking to CoreFoundation.framework and handling device name `CFString`s. diff --git a/examples/beep.rs b/examples/beep.rs index e0266aa79..e068fd9c8 100644 --- a/examples/beep.rs +++ b/examples/beep.rs @@ -25,12 +25,26 @@ struct Opt { #[arg(short, long)] #[allow(dead_code)] jack: bool, + /// Use the Pipewire host + #[cfg(all( + any( + target_os = "linux", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "netbsd" + ), + feature = "pipewire" + ))] + #[allow(dead_code)] + pipewire: bool, } fn main() -> anyhow::Result<()> { let opt = Opt::parse(); // Conditionally compile with jack if the feature is specified. + // Manually check for flags. Can be passed through cargo with -- e.g. + // cargo run --release --example beep --features jack -- --jack #[cfg(all( any( target_os = "linux", @@ -40,8 +54,6 @@ fn main() -> anyhow::Result<()> { ), feature = "jack" ))] - // Manually check for flags. Can be passed through cargo with -- e.g. - // cargo run --release --example beep --features jack -- --jack let host = if opt.jack { cpal::host_from_id(cpal::available_hosts() .into_iter() @@ -53,6 +65,29 @@ fn main() -> anyhow::Result<()> { cpal::default_host() }; + // Conditionally compile with PipeWire if the feature is specified. + #[cfg(all( + any( + target_os = "linux", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "netbsd" + ), + feature = "pipewire" + ))] + // Manually check for flags. Can be passed through cargo with -- e.g. + // cargo run --release --example beep --features pipewire -- --pipewire + let host = if opt.pipewire { + cpal::host_from_id(cpal::available_hosts() + .into_iter() + .find(|id| *id == cpal::HostId::PipeWire) + .expect( + "make sure --features pipewire is specified. only works on OSes where PipeWire is available", + )).expect("PipeWire host unavailable") + } else { + cpal::default_host() + }; + #[cfg(any( not(any( target_os = "linux", @@ -60,7 +95,7 @@ fn main() -> anyhow::Result<()> { target_os = "freebsd", target_os = "netbsd" )), - not(feature = "jack") + not(any(feature = "jack", feature = "pipewire")) ))] let host = cpal::default_host(); diff --git a/src/host/mod.rs b/src/host/mod.rs index 8de06cbe0..80d735083 100644 --- a/src/host/mod.rs +++ b/src/host/mod.rs @@ -24,6 +24,11 @@ pub(crate) mod jack; pub(crate) mod null; #[cfg(target_os = "android")] pub(crate) mod oboe; +#[cfg(all( + any(target_os = "linux", target_os = "dragonfly", target_os = "freebsd"), + feature = "pipewire" +))] +pub(crate) mod pipewire; #[cfg(windows)] pub(crate) mod wasapi; #[cfg(all(target_arch = "wasm32", feature = "wasm-bindgen"))] diff --git a/src/host/pipewire/conn.rs b/src/host/pipewire/conn.rs new file mode 100644 index 000000000..685655939 --- /dev/null +++ b/src/host/pipewire/conn.rs @@ -0,0 +1,359 @@ +extern crate pipewire; + +use intmap::IntMap; + +use super::Device; + +use self::pipewire::{ + metadata::{Metadata, MetadataListener}, + node::{Node, NodeListener}, + prelude::*, + proxy::Listener, + registry::{GlobalObject, Registry}, + spa::{Direction, ForeignDict}, + types::ObjectType, + Core, MainLoop, +}; + +use std::{ + borrow::BorrowMut, + cell::{Cell, RefCell}, + collections::HashMap, + rc::Rc, + sync::mpsc, + thread, + time::Duration, +}; + +use super::device::DeviceType; + +#[derive(Debug)] +enum Message { + Terminate, + GetSettings, + CreateDeviceNode { + name: String, + device_type: DeviceType, + autoconnect: bool, + }, + EnumerateDevices, +} + +enum MessageRepl { + Settings(Settings), + NodeInfo(NodeInfo), +} + +pub struct NodeInfo { + pub name: String, + pub id: u32, +} + +pub struct PWClient { + pw_sender: pipewire::channel::Sender, + main_receiver: mpsc::Receiver, +} + +impl PWClient { + pub fn new() -> Self { + let (main_sender, main_receiver) = mpsc::channel(); + let (pw_sender, pw_receiver) = pipewire::channel::channel(); + + let _pw_thread = thread::spawn(move || pw_thread(main_sender, pw_receiver)); + + Self { + pw_sender, + main_receiver, + } + } + + pub fn get_settings(&self) -> Result { + match self.pw_sender.send(Message::GetSettings) { + Ok(_) => match self.main_receiver.recv() { + Ok(MessageRepl::Settings(settings)) => Ok(settings), + Err(err) => Err(format!("{:?}", err)), + _ => Err(format!("")), + }, + Err(err) => Err(format!("{:?}", err)), + } + } + + pub fn create_device_node( + &self, + name: String, + device_type: DeviceType, + connect_ports_automatically: bool, + ) -> Result { + match self.pw_sender.send(Message::CreateDeviceNode { + name, + device_type, + autoconnect: connect_ports_automatically, + }) { + Ok(_) => match self.main_receiver.recv() { + Ok(MessageRepl::NodeInfo(info)) => Ok(info), + Err(err) => Err(format!("{:?}", err)), + _ => Err(format!("")), + }, + Err(err) => Err(format!("{:?}", err)), + } + } +} + +#[derive(Default)] +struct State { + settings: Settings, + nodes: Vec, + devices: IntMap, +} + +#[derive(Default, Clone, Debug)] +pub struct Settings { + pub allowed_sample_rates: Vec, + pub min_buffer_size: u32, + pub max_buffer_size: u32, + pub default_buffer_size: u32, +} + +enum ProxyItem { + Metadata { + _proxy: Metadata, + _listener: MetadataListener, + }, + Node { + _proxy: Node, + _listener: NodeListener, + }, +} + +fn pw_thread( + main_sender: mpsc::Sender, + pw_receiver: pipewire::channel::Receiver, +) { + pipewire::init(); + // let state = Rc::new(State::default()); + let state = Rc::new(RefCell::new(State::default())); + let proxies = Rc::new(RefCell::new(HashMap::new())); + + let mainloop = pipewire::MainLoop::new().expect("Failed to create PipeWire Mainloop"); + + let context = pipewire::Context::new(&mainloop).expect("Failed to create PipeWire Context"); + let core = Rc::new( + context + .connect(None) + .expect("Failed to connect to PipeWire"), + ); + let registry = Rc::new(core.get_registry().expect("Failed to get Registry")); + + let _receiver = pw_receiver.attach(&mainloop, { + let mainloop = mainloop.clone(); + let state = state.clone(); + let main_sender = main_sender.clone(); + let core = core.clone(); + let proxies = proxies.clone(); + + move |msg| match msg { + Message::Terminate => mainloop.quit(), + Message::GetSettings => { + let settings = state.borrow().settings.clone(); + main_sender + .send(MessageRepl::Settings(settings)) + .expect("Failed to send settings"); + } + Message::CreateDeviceNode { + name, + device_type, + autoconnect, + } => { + let node: Node = core + .create_object( + "adapter", //node_factory.get().expect("No node factory found"), + &pipewire::properties! { + *pipewire::keys::NODE_NAME => name.clone(), + *pipewire::keys::FACTORY_NAME => "support.null-audio-sink", + *pipewire::keys::MEDIA_TYPE => "Audio", + *pipewire::keys::MEDIA_CATEGORY => match device_type { + DeviceType::InputDevice => "Capture", + DeviceType::OutputDevice => "Playback" + }, + *pipewire::keys::NODE_AUTOCONNECT => match autoconnect { + false => "false", + true => "true", + }, + // Don't remove the object on the remote when we destroy our proxy. + // *pipewire::keys::OBJECT_LINGER => "1" + }, + ) + .expect("Failed to create object"); + + let id = Rc::new(Cell::new(0)); + let id_clone = id.clone(); + let _listener = node + .add_listener_local() + .info(move |info| { + id_clone.set(info.id()); + }) + .param(|a, b, c, d| { + println!("{}, {}, {}, {}", a, b, c, d); + }) + .register(); + println!("{:?}", node); + while id.get() == 0 { + mainloop.run(); + } + + state.as_ref().borrow_mut().nodes.push(ProxyItem::Node { + _proxy: node, + _listener, + }); + + main_sender + .send(MessageRepl::NodeInfo(NodeInfo { name, id: id.get() })) + .expect("Failed to send node info"); + } + Message::EnumerateDevices => {} + } + }); + + let _reg_listener = registry + .add_listener_local() + .global({ + let state = state.clone(); + let registry = registry.clone(); + let proxies = proxies.clone(); + + move |global| match global.type_ { + ObjectType::Metadata => handle_metadata(global, &state, ®istry, &proxies), + ObjectType::Node => { + if let Some(ref props) = global.props { + let mut state = state.as_ref().borrow_mut(); + let name = props + .get("node.nick") + .or(props.get("node.description")) + .unwrap_or("Unknown device"); + match props.get("media.class") { + Some("Audio/Source") => { + state.devices.insert( + global.id.into(), + NodeInfo { + name: name.to_string(), + id: global.id, + }, + ); + } + Some("Audio/Sink") => { + state.devices.insert( + global.id.into(), + NodeInfo { + name: name.to_string(), + id: global.id, + }, + ); + } + _ => {} + } + if props.get("media.class") == Some("Audio/Source") + && global.type_ == ObjectType::Node + { + println!( + "object: id:{} type:{}/{} nick:{}", + global.id, + global.type_, + global.version, + props.get("node.nick").unwrap_or("failed to get name") + ); + } + } + } + _ => {} + } + }) + .register(); + + // let timer = mainloop.add_timer({ + // move |_| { + // } + // }); + + // timer + // .update_timer( + // Some(Duration::from_millis(500)), + // Some(Duration::from_secs(1)), + // ) + // .into_result() + // .expect("FU"); + + mainloop.run(); +} + +fn handle_metadata( + metadata: &GlobalObject, + state: &Rc>, + registry: &Rc, + proxies: &Rc>>, +) { + let props = metadata + .props + .as_ref() + .expect("Metadata object is missing properties"); + + match props.get("metadata.name") { + Some("settings") => { + let settings: Metadata = registry.bind(metadata).expect("Metadata"); + + let _listener = settings + .add_listener_local() + .property({ + let state = state.clone(); + move |_, key, _, value| { + let mut sample_rate = 0; + let mut state = state.as_ref().borrow_mut(); + if let Some(value) = value { + if let Ok(value) = value.parse::() { + match key { + Some("clock.rate") => sample_rate = value, + Some("clock.quantum") => { + state.settings.default_buffer_size = value + } + Some("clock.min-quantum") => { + state.settings.min_buffer_size = value + } + Some("clock.max-quantum") => { + state.settings.max_buffer_size = value + } + _ => {} + }; + } else { + match key { + Some("clock.allowed-rates") => { + let rates: Result, _> = value[2..value.len() - 2] + .split_whitespace() + .map(|x| x.parse::()) + .collect(); + state.settings.allowed_sample_rates = + rates.expect("Couldn't parse allowed rates"); + } + _ => {} + } + } + } + // Not sure if allowed-rates can be empty, + // but if it is just push the currently used one. + if state.settings.allowed_sample_rates.is_empty() { + state.settings.allowed_sample_rates.push(sample_rate); + } + 0 + } + }) + .register(); + + proxies.as_ref().borrow_mut().insert( + metadata.id, + ProxyItem::Metadata { + _proxy: settings, + _listener, + }, + ); + } + _ => {} + }; +} diff --git a/src/host/pipewire/device.rs b/src/host/pipewire/device.rs new file mode 100644 index 000000000..1abd1a526 --- /dev/null +++ b/src/host/pipewire/device.rs @@ -0,0 +1,271 @@ +use crate::traits::DeviceTrait; +use crate::{ + BackendSpecificError, BuildStreamError, Data, DefaultStreamConfigError, DeviceNameError, + InputCallbackInfo, OutputCallbackInfo, SampleFormat, SampleRate, StreamConfig, StreamError, + SupportedBufferSize, SupportedStreamConfig, SupportedStreamConfigRange, + SupportedStreamConfigsError, +}; +use std::hash::{Hash, Hasher}; +use std::rc::Rc; + +use super::stream::Stream; +use super::PIPEWIRE_SAMPLE_FORMAT; + +pub type SupportedInputConfigs = std::vec::IntoIter; +pub type SupportedOutputConfigs = std::vec::IntoIter; + +const DEFAULT_NUM_CHANNELS: u16 = 2; +const DEFAULT_SUPPORTED_CHANNELS: [u16; 10] = [1, 2, 4, 6, 8, 16, 24, 32, 48, 64]; + +/// If a device is for input or output. +/// Until we have duplex stream support PipeWire nodes and CPAL devices for PipeWire will be either input or output. +#[derive(Clone, Debug)] +pub enum DeviceType { + InputDevice, + OutputDevice, +} +#[derive(Clone)] +pub struct Device { + pub(crate) name: String, + pub(crate) device_type: DeviceType, + pub(crate) connect_ports_automatically: bool, + pub(crate) client: Rc, +} + +impl Device { + fn new_device( + name: String, + connect_ports_automatically: bool, + device_type: DeviceType, + client: Rc, + ) -> Result { + while client + .get_settings() + .and_then(|s| { + if s.allowed_sample_rates.is_empty() { + Err(String::new()) + } else { + Ok(true) + } + }) + .is_err() + {} + + let settings = client.get_settings().unwrap(); + + let info = client + .create_device_node(name, device_type.clone(), connect_ports_automatically) + .expect("Error creating device"); + + Ok(Device { + name: info.name, + device_type, + connect_ports_automatically, + client, + }) + } + + pub fn default_output_device( + name: &str, + connect_ports_automatically: bool, + client: Rc, + ) -> Result { + let output_client_name = format!("{}_out", name); + Device::new_device( + output_client_name, + connect_ports_automatically, + DeviceType::OutputDevice, + client, + ) + } + + pub fn default_input_device( + name: &str, + connect_ports_automatically: bool, + client: Rc, + ) -> Result { + let input_client_name = format!("{}_in", name); + Device::new_device( + input_client_name, + connect_ports_automatically, + DeviceType::InputDevice, + client, + ) + } + + pub fn default_config(&self) -> Result { + let settings = self.client.get_settings().unwrap(); + let channels = DEFAULT_NUM_CHANNELS; + // Default is highest sample rate possible + let sample_rate = SampleRate(*settings.allowed_sample_rates.last().unwrap()); + let buffer_size = SupportedBufferSize::Range { + min: settings.min_buffer_size, + max: settings.max_buffer_size, + }; + // The sample format for JACK audio ports is always "32-bit float mono audio" in the current implementation. + // Custom formats are allowed within JACK, but this is of niche interest. + // The format can be found programmatically by calling jack::PortSpec::port_type() on a created port. + let sample_format = PIPEWIRE_SAMPLE_FORMAT; + Ok(SupportedStreamConfig { + channels, + sample_rate, + buffer_size, + sample_format, + }) + } + + pub fn supported_configs(&self) -> Vec { + let settings = self.client.get_settings().unwrap(); + let f = match self.default_config() { + Err(_) => return vec![], + Ok(f) => f, + }; + + let mut supported_configs = vec![]; + + for &channels in DEFAULT_SUPPORTED_CHANNELS.iter() { + supported_configs.push(SupportedStreamConfigRange { + channels, + min_sample_rate: SampleRate(*settings.allowed_sample_rates.first().unwrap()), + // Default is maximum possible, so just use that + max_sample_rate: f.sample_rate, + buffer_size: f.buffer_size.clone(), + sample_format: f.sample_format, + }); + } + supported_configs + } + + pub fn is_input(&self) -> bool { + matches!(self.device_type, DeviceType::InputDevice) + } + + pub fn is_output(&self) -> bool { + matches!(self.device_type, DeviceType::OutputDevice) + } +} + +impl DeviceTrait for Device { + type SupportedInputConfigs = SupportedInputConfigs; + type SupportedOutputConfigs = SupportedOutputConfigs; + type Stream = Stream; + + fn name(&self) -> Result { + Ok(self.name.clone()) + } + + fn supported_input_configs( + &self, + ) -> Result { + Ok(self.supported_configs().into_iter()) + } + + fn supported_output_configs( + &self, + ) -> Result { + Ok(self.supported_configs().into_iter()) + } + + /// Returns the default input config + /// The sample format for JACK audio ports is always "32-bit float mono audio" unless using a custom type. + /// The sample rate is set by the JACK server. + fn default_input_config(&self) -> Result { + self.default_config() + } + + /// Returns the default output config + /// The sample format for JACK audio ports is always "32-bit float mono audio" unless using a custom type. + /// The sample rate is set by the JACK server. + fn default_output_config(&self) -> Result { + self.default_config() + } + + fn build_input_stream_raw( + &self, + conf: &StreamConfig, + sample_format: SampleFormat, + data_callback: D, + error_callback: E, + ) -> Result + where + D: FnMut(&Data, &InputCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, + { + let settings = self.client.get_settings().unwrap(); + if let DeviceType::OutputDevice = &self.device_type { + // Trying to create an input stream from an output device + return Err(BuildStreamError::StreamConfigNotSupported); + } + // FIXME: Not sure if we should go to the nearest neighbour sample rate + // This issue also happens on build_output_stream_raw() + if settings.allowed_sample_rates.contains(&conf.sample_rate.0) + || sample_format != PIPEWIRE_SAMPLE_FORMAT + { + return Err(BuildStreamError::StreamConfigNotSupported); + } + + let mut stream = Stream::new_input( + self.client.clone(), + conf.channels, + data_callback, + error_callback, + ); + + if self.connect_ports_automatically { + stream.connect_to_system_inputs(); + } + + Ok(stream) + } + + fn build_output_stream_raw( + &self, + conf: &StreamConfig, + sample_format: SampleFormat, + data_callback: D, + error_callback: E, + ) -> Result + where + D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, + { + let settings = self.client.get_settings().unwrap(); + if let DeviceType::InputDevice = &self.device_type { + // Trying to create an output stream from an input device + return Err(BuildStreamError::StreamConfigNotSupported); + } + if settings.allowed_sample_rates.contains(&conf.sample_rate.0) + || sample_format != PIPEWIRE_SAMPLE_FORMAT + { + return Err(BuildStreamError::StreamConfigNotSupported); + } + + let mut stream = Stream::new_output( + self.client.clone(), + conf.channels, + data_callback, + error_callback, + ); + + if self.connect_ports_automatically { + stream.connect_to_system_outputs(); + } + + Ok(stream) + } +} + +impl PartialEq for Device { + fn eq(&self, other: &Self) -> bool { + // Device::name() can never fail in this implementation + self.name().unwrap() == other.name().unwrap() + } +} + +impl Eq for Device {} + +impl Hash for Device { + fn hash(&self, state: &mut H) { + self.name().unwrap().hash(state); + } +} diff --git a/src/host/pipewire/mod.rs b/src/host/pipewire/mod.rs new file mode 100644 index 000000000..734030cbc --- /dev/null +++ b/src/host/pipewire/mod.rs @@ -0,0 +1,124 @@ +extern crate pipewire; + +use std::rc::Rc; +use std::sync::mpsc; + +use crate::traits::HostTrait; +use crate::{DevicesError, SampleFormat, SupportedStreamConfigRange}; + +mod device; +pub use self::device::Device; +pub use self::stream::Stream; +mod conn; +mod stream; + +const PIPEWIRE_SAMPLE_FORMAT: SampleFormat = SampleFormat::F32; + +pub type SupportedInputConfigs = std::vec::IntoIter; +pub type SupportedOutputConfigs = std::vec::IntoIter; +pub type Devices = std::vec::IntoIter; + +/// The PipeWire Host type + +pub struct Host { + /// The name that the client will have in PipeWire. + /// Until we have duplex streams two clients will be created adding "out" or "in" to the name + /// since names have to be unique. + name: String, + /// If ports are to be connected to the system (soundcard) ports automatically (default is true). + connect_ports_automatically: bool, + /// A list of the devices that have been created from this Host. + devices_created: Vec, + + client: Rc, +} + +impl Host { + pub fn new() -> Result { + let client = Rc::new(conn::PWClient::new()); + + let mut host = Host { + name: "cpal_client".to_owned(), + connect_ports_automatically: true, + devices_created: vec![], + client, + }; + + // Devices don't exist for PipeWire, they have to be created + host.initialize_default_devices(); + Ok(host) + } + /// Set whether the ports should automatically be connected to system + /// (default is true) + pub fn set_connect_automatically(&mut self, do_connect: bool) { + self.connect_ports_automatically = do_connect; + } + + pub fn input_device_with_name(&mut self, name: &str) -> Option { + self.name = name.to_owned(); + self.default_input_device() + } + + pub fn output_device_with_name(&mut self, name: &str) -> Option { + self.name = name.to_owned(); + self.default_output_device() + } + + fn initialize_default_devices(&mut self) { + let in_device_res = Device::default_input_device( + &self.name, + self.connect_ports_automatically, + self.client.clone(), + ); + + match in_device_res { + Ok(device) => self.devices_created.push(device), + Err(err) => { + println!("{}", err); + } + } + + let out_device_res = Device::default_output_device( + &self.name, + self.connect_ports_automatically, + self.client.clone(), + ); + match out_device_res { + Ok(device) => self.devices_created.push(device), + Err(err) => { + println!("{}", err); + } + } + } +} + +impl HostTrait for Host { + type Devices = Devices; + type Device = Device; + + fn is_available() -> bool { + true + } + + fn devices(&self) -> Result { + Ok(self.devices_created.clone().into_iter()) + } + + fn default_input_device(&self) -> Option { + for device in &self.devices_created { + if device.is_input() { + return Some(device.clone()); + } + } + None + } + + fn default_output_device(&self) -> Option { + for device in &self.devices_created { + if device.is_output() { + return Some(device.clone()); + } + } + None + } +} diff --git a/src/host/pipewire/stream.rs b/src/host/pipewire/stream.rs new file mode 100644 index 000000000..7fd4d4d04 --- /dev/null +++ b/src/host/pipewire/stream.rs @@ -0,0 +1,463 @@ +use crate::traits::StreamTrait; +use crate::ChannelCount; +use std::rc::Rc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex}; + +use crate::{ + BackendSpecificError, Data, InputCallbackInfo, OutputCallbackInfo, PauseStreamError, + PlayStreamError, SampleRate, StreamError, +}; + +use super::PIPEWIRE_SAMPLE_FORMAT; + +type ErrorCallbackPtr = Arc>; + +pub struct Stream { + // TODO: It might be faster to send a message when playing/pausing than to check this every iteration + playing: Arc, + // async_client: jack::AsyncClient, + // Port names are stored in order to connect them to other ports in jack automatically + // input_port_names: Vec, + // output_port_names: Vec, +} + +impl Stream { + // TODO: Return error messages + pub fn new_input( + client: Rc, + channels: ChannelCount, + data_callback: D, + mut error_callback: E, + ) -> Stream + where + D: FnMut(&Data, &InputCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, + { + // let mut ports = vec![]; + // let mut port_names: Vec = vec![]; + // Create ports + // for i in 0..channels { + // let port_try = client.register_port(&format!("in_{}", i), jack::AudioIn::default()); + // match port_try { + // Ok(port) => { + // // Get the port name in order to later connect it automatically + // if let Ok(port_name) = port.name() { + // port_names.push(port_name); + // } + // // Store the port into a Vec to move to the ProcessHandler + // ports.push(port); + // } + // Err(e) => { + // // If port creation failed, send the error back via the error_callback + // error_callback( + // BackendSpecificError { + // description: e.to_string(), + // } + // .into(), + // ); + // } + // } + // } + + let playing = Arc::new(AtomicBool::new(true)); + + let error_callback_ptr = Arc::new(Mutex::new(error_callback)) as ErrorCallbackPtr; + + // let input_process_handler = LocalProcessHandler::new( + // vec![], + // ports, + // SampleRate(client.sample_rate() as u32), + // client.buffer_size() as usize, + // Some(Box::new(data_callback)), + // None, + // playing.clone(), + // Arc::clone(&error_callback_ptr), + // ); + + let notification_handler = JackNotificationHandler::new(error_callback_ptr); + + // let async_client = client + // .activate_async(notification_handler, input_process_handler) + // .unwrap(); + + Stream { + playing, + // async_client, + // input_port_names: port_names, + // output_port_names: vec![], + } + } + + pub fn new_output( + client: Rc, + channels: ChannelCount, + data_callback: D, + mut error_callback: E, + ) -> Stream + where + D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, + { + // let mut ports = vec![]; + // let mut port_names: Vec = vec![]; + // // Create ports + // for i in 0..channels { + // let port_try = client.register_port(&format!("out_{}", i), jack::AudioOut::default()); + // match port_try { + // Ok(port) => { + // // Get the port name in order to later connect it automatically + // if let Ok(port_name) = port.name() { + // port_names.push(port_name); + // } + // // Store the port into a Vec to move to the ProcessHandler + // ports.push(port); + // } + // Err(e) => { + // // If port creation failed, send the error back via the error_callback + // error_callback( + // BackendSpecificError { + // description: e.to_string(), + // } + // .into(), + // ); + // } + // } + // } + + let playing = Arc::new(AtomicBool::new(true)); + + let error_callback_ptr = Arc::new(Mutex::new(error_callback)) as ErrorCallbackPtr; + + // let output_process_handler = LocalProcessHandler::new( + // ports, + // vec![], + // SampleRate(client.sample_rate() as u32), + // client.buffer_size() as usize, + // None, + // Some(Box::new(data_callback)), + // playing.clone(), + // Arc::clone(&error_callback_ptr), + // ); + + // let notification_handler = JackNotificationHandler::new(error_callback_ptr); + + // let async_client = client + // .activate_async(notification_handler, output_process_handler) + // .unwrap(); + + Stream { + playing, + // async_client, + // input_port_names: vec![], + // output_port_names: port_names, + } + } + + /// Connect to the standard system outputs in jack, system:playback_1 and system:playback_2 + /// This has to be done after the client is activated, doing it just after creating the ports doesn't work. + pub fn connect_to_system_outputs(&mut self) { + // Get the system ports + // let system_ports = self.async_client.as_client().ports( + // Some("system:playback_.*"), + // None, + // jack::PortFlags::empty(), + // ); + + // // Connect outputs from this client to the system playback inputs + // for i in 0..self.output_port_names.len() { + // if i >= system_ports.len() { + // break; + // } + // match self + // .async_client + // .as_client() + // .connect_ports_by_name(&self.output_port_names[i], &system_ports[i]) + // { + // Ok(_) => (), + // Err(e) => println!("Unable to connect to port with error {}", e), + // } + // } + } + + /// Connect to the standard system outputs in jack, system:capture_1 and system:capture_2 + /// This has to be done after the client is activated, doing it just after creating the ports doesn't work. + pub fn connect_to_system_inputs(&mut self) { + // Get the system ports + // let system_ports = self.async_client.as_client().ports( + // Some("system:capture_.*"), + // None, + // jack::PortFlags::empty(), + // ); + + // // Connect outputs from this client to the system playback inputs + // for i in 0..self.input_port_names.len() { + // if i >= system_ports.len() { + // break; + // } + // match self + // .async_client + // .as_client() + // .connect_ports_by_name(&system_ports[i], &self.input_port_names[i]) + // { + // Ok(_) => (), + // Err(e) => println!("Unable to connect to port with error {}", e), + // } + // } + } +} + +impl StreamTrait for Stream { + fn play(&self) -> Result<(), PlayStreamError> { + self.playing.store(true, Ordering::SeqCst); + Ok(()) + } + + fn pause(&self) -> Result<(), PauseStreamError> { + self.playing.store(false, Ordering::SeqCst); + Ok(()) + } +} + +struct LocalProcessHandler { + /// No new ports are allowed to be created after the creation of the LocalProcessHandler as that would invalidate the buffer sizes + // out_ports: Vec>, + // in_ports: Vec>, + sample_rate: SampleRate, + buffer_size: usize, + input_data_callback: Option>, + output_data_callback: Option>, + + // JACK audio samples are 32-bit float (unless you do some custom dark magic) + // temp_input_buffer: Vec, + // temp_output_buffer: Vec, + playing: Arc, + creation_timestamp: std::time::Instant, + /// This should not be called on `process`, only on `buffer_size` because it can block. + error_callback_ptr: ErrorCallbackPtr, +} + +impl LocalProcessHandler { + fn new( + // out_ports: Vec>, + // in_ports: Vec>, + sample_rate: SampleRate, + buffer_size: usize, + input_data_callback: Option>, + output_data_callback: Option< + Box, + >, + playing: Arc, + error_callback_ptr: ErrorCallbackPtr, + ) -> Self { + // These may be reallocated in the `buffer_size` callback. + // let temp_input_buffer = vec![0.0; in_ports.len() * buffer_size]; + // let temp_output_buffer = vec![0.0; out_ports.len() * buffer_size]; + + LocalProcessHandler { + // out_ports, + // in_ports, + sample_rate, + buffer_size, + input_data_callback, + output_data_callback, + // temp_input_buffer, + // temp_output_buffer, + playing, + creation_timestamp: std::time::Instant::now(), + error_callback_ptr, + } + } +} + +fn temp_buffer_to_data(temp_input_buffer: &mut Vec, total_buffer_size: usize) -> Data { + let slice = &temp_input_buffer[0..total_buffer_size]; + let data = slice.as_ptr() as *mut (); + let len = total_buffer_size; + let data = unsafe { Data::from_parts(data, len, PIPEWIRE_SAMPLE_FORMAT) }; + data +} + +// impl jack::ProcessHandler for LocalProcessHandler { +// fn process(&mut self, _: &jack::Client, process_scope: &jack::ProcessScope) -> jack::Control { +// if !self.playing.load(Ordering::SeqCst) { +// return jack::Control::Continue; +// } + +// // This should be equal to self.buffer_size, but the implementation will +// // work even if it is less. Will panic in `temp_buffer_to_data` if greater. +// let current_frame_count = process_scope.n_frames() as usize; + +// // Get timestamp data +// let cycle_times = process_scope.cycle_times(); +// let current_start_usecs = match cycle_times { +// Ok(times) => times.current_usecs, +// Err(_) => { +// // jack was unable to get the current time information +// // Fall back to using Instants +// let now = std::time::Instant::now(); +// let duration = now.duration_since(self.creation_timestamp); +// duration.as_micros() as u64 +// } +// }; +// let start_cycle_instant = micros_to_stream_instant(current_start_usecs); +// let start_callback_instant = start_cycle_instant +// .add(frames_to_duration( +// process_scope.frames_since_cycle_start() as usize, +// self.sample_rate, +// )) +// .expect("`playback` occurs beyond representation supported by `StreamInstant`"); + +// if let Some(input_callback) = &mut self.input_data_callback { +// // Let's get the data from the input ports and run the callback + +// let num_in_channels = self.in_ports.len(); + +// // Read the data from the input ports into the temporary buffer +// // Go through every channel and store its data in the temporary input buffer +// for ch_ix in 0..num_in_channels { +// let input_channel = &self.in_ports[ch_ix].as_slice(process_scope); +// for i in 0..current_frame_count { +// self.temp_input_buffer[ch_ix + i * num_in_channels] = input_channel[i]; +// } +// } +// // Create a slice of exactly current_frame_count frames +// let data = temp_buffer_to_data( +// &mut self.temp_input_buffer, +// current_frame_count * num_in_channels, +// ); +// // Create timestamp +// let frames_since_cycle_start = process_scope.frames_since_cycle_start() as usize; +// let duration_since_cycle_start = +// frames_to_duration(frames_since_cycle_start, self.sample_rate); +// let callback = start_callback_instant +// .add(duration_since_cycle_start) +// .expect("`playback` occurs beyond representation supported by `StreamInstant`"); +// let capture = start_callback_instant; +// let timestamp = crate::InputStreamTimestamp { callback, capture }; +// let info = crate::InputCallbackInfo { timestamp }; +// input_callback(&data, &info); +// } + +// if let Some(output_callback) = &mut self.output_data_callback { +// let num_out_channels = self.out_ports.len(); + +// // Create a slice of exactly current_frame_count frames +// let mut data = temp_buffer_to_data( +// &mut self.temp_output_buffer, +// current_frame_count * num_out_channels, +// ); +// // Create timestamp +// let frames_since_cycle_start = process_scope.frames_since_cycle_start() as usize; +// let duration_since_cycle_start = +// frames_to_duration(frames_since_cycle_start, self.sample_rate); +// let callback = start_callback_instant +// .add(duration_since_cycle_start) +// .expect("`playback` occurs beyond representation supported by `StreamInstant`"); +// let buffer_duration = frames_to_duration(current_frame_count, self.sample_rate); +// let playback = start_cycle_instant +// .add(buffer_duration) +// .expect("`playback` occurs beyond representation supported by `StreamInstant`"); +// let timestamp = crate::OutputStreamTimestamp { callback, playback }; +// let info = crate::OutputCallbackInfo { timestamp }; +// output_callback(&mut data, &info); + +// // Deinterlace +// for ch_ix in 0..num_out_channels { +// let output_channel = &mut self.out_ports[ch_ix].as_mut_slice(process_scope); +// for i in 0..current_frame_count { +// output_channel[i] = self.temp_output_buffer[ch_ix + i * num_out_channels]; +// } +// } +// } + +// // Continue as normal +// jack::Control::Continue +// } + +// fn buffer_size(&mut self, _: &jack::Client, size: jack::Frames) -> jack::Control { +// // The `buffer_size` callback is actually called on the process thread, but +// // it does not need to be suitable for real-time use. Thus we can simply allocate +// // new buffers here. It is also fine to call the error callback. +// // Details: https://github.com/RustAudio/rust-jack/issues/137 +// let new_size = size as usize; +// if new_size != self.buffer_size { +// self.buffer_size = new_size; +// self.temp_input_buffer = vec![0.0; self.in_ports.len() * new_size]; +// self.temp_output_buffer = vec![0.0; self.out_ports.len() * new_size]; +// let description = format!("buffer size changed to: {}", new_size); +// if let Ok(mut mutex_guard) = self.error_callback_ptr.lock() { +// let err = &mut *mutex_guard; +// err(BackendSpecificError { description }.into()); +// } +// } + +// jack::Control::Continue +// } +// } + +fn micros_to_stream_instant(micros: u64) -> crate::StreamInstant { + let nanos = micros * 1000; + let secs = micros / 1_000_000; + let subsec_nanos = nanos - secs * 1_000_000_000; + crate::StreamInstant::new(secs as i64, subsec_nanos as u32) +} + +// Convert the given duration in frames at the given sample rate to a `std::time::Duration`. +fn frames_to_duration(frames: usize, rate: crate::SampleRate) -> std::time::Duration { + let secsf = frames as f64 / rate.0 as f64; + let secs = secsf as u64; + let nanos = ((secsf - secs as f64) * 1_000_000_000.0) as u32; + std::time::Duration::new(secs, nanos) +} + +/// Receives notifications from the JACK server. It is unclear if this may be run concurrent with itself under JACK2 specs +/// so it needs to be Sync. +struct JackNotificationHandler { + error_callback_ptr: ErrorCallbackPtr, + init_sample_rate_flag: Arc, +} + +impl JackNotificationHandler { + pub fn new(error_callback_ptr: ErrorCallbackPtr) -> Self { + JackNotificationHandler { + error_callback_ptr, + init_sample_rate_flag: Arc::new(AtomicBool::new(false)), + } + } + + fn send_error(&mut self, description: String) { + // This thread isn't the audio thread, it's fine to block + if let Ok(mut mutex_guard) = self.error_callback_ptr.lock() { + let err = &mut *mutex_guard; + err(BackendSpecificError { description }.into()); + } + } +} + +// impl jack::NotificationHandler for JackNotificationHandler { +// fn shutdown(&mut self, _status: jack::ClientStatus, reason: &str) { +// self.send_error(format!("JACK was shut down for reason: {}", reason)); +// } + +// fn sample_rate(&mut self, _: &jack::Client, srate: jack::Frames) -> jack::Control { +// match self.init_sample_rate_flag.load(Ordering::SeqCst) { +// false => { +// // One of these notifications is sent every time a client is started. +// self.init_sample_rate_flag.store(true, Ordering::SeqCst); +// jack::Control::Continue +// } +// true => { +// self.send_error(format!("sample rate changed to: {}", srate)); +// // Since CPAL currently has no way of signaling a sample rate change in order to make +// // all necessary changes that would bring we choose to quit. +// jack::Control::Quit +// } +// } +// } + +// fn xrun(&mut self, _: &jack::Client) -> jack::Control { +// self.send_error(String::from("xrun (buffer over or under run)")); +// jack::Control::Continue +// } +// } diff --git a/src/platform/mod.rs b/src/platform/mod.rs index 3b566f15f..ec35c1ea9 100644 --- a/src/platform/mod.rs +++ b/src/platform/mod.rs @@ -586,8 +586,14 @@ mod platform_impl { SupportedInputConfigs as JackSupportedInputConfigs, SupportedOutputConfigs as JackSupportedOutputConfigs, }; + #[cfg(feature = "pipewire")] + pub use crate::host::pipewire::{ + Device as PipeWireDevice, Devices as PipeWireDevices, Host as PipeWireHost, + Stream as PipeWireStream, SupportedInputConfigs as PipeWireSupportedInputConfigs, + SupportedOutputConfigs as PipeWireupportedOutputConfigs, + }; - impl_platform_host!(#[cfg(feature = "jack")] Jack jack "JACK", Alsa alsa "ALSA"); + impl_platform_host!(#[cfg(feature = "pipewire")] PipeWire pipewire "PipeWire", #[cfg(feature = "jack")] Jack jack "JACK", Alsa alsa "ALSA"); /// The default host for the current compilation target platform. pub fn default_host() -> Host {