diff --git a/rs/cli/Cargo.toml b/rs/cli/Cargo.toml index 0742ba5..5f199ea 100644 --- a/rs/cli/Cargo.toml +++ b/rs/cli/Cargo.toml @@ -2,12 +2,12 @@ name = "cli" version = "0.1.0" edition = "2021" +description = "A CLI to process audio via the Elementary runtime" [dependencies] elem = { path = "../elem" } server = { path = "../server" } cxx = "1.0.131" -env_logger = "0.11.5" futures-util = "0.3.31" hound = "3.5.1" log = "0.4.22" @@ -16,6 +16,12 @@ serde_json = "1.0.133" tokio = { version = "1.41.1", features = ["full"] } tokio-tungstenite = "0.24.0" cpal = "0.15.3" +tracing = "0.1.41" +tracing-subscriber = "0.3.19" +thiserror = "2.0.11" +ringbuf = "0.4.7" +clap = { version = "4.5.29", features = ["derive"] } +clap_derive = { version = "4.0.0-rc.1" } [build-dependencies] cxx-build = "1.0.131" diff --git a/rs/cli/flake.lock b/rs/cli/flake.lock new file mode 100644 index 0000000..3053cac --- /dev/null +++ b/rs/cli/flake.lock @@ -0,0 +1,96 @@ +{ + "nodes": { + "flake-parts": { + "inputs": { + "nixpkgs-lib": "nixpkgs-lib" + }, + "locked": { + "lastModified": 1743550720, + "narHash": "sha256-hIshGgKZCgWh6AYJpJmRgFdR3WUbkY04o82X05xqQiY=", + "owner": "hercules-ci", + "repo": "flake-parts", + "rev": "c621e8422220273271f52058f618c94e405bb0f5", + "type": "github" + }, + "original": { + "owner": "hercules-ci", + "repo": "flake-parts", + "type": "github" + } + }, + "nixpkgs": { + "locked": { + "lastModified": 1744932701, + "narHash": "sha256-fusHbZCyv126cyArUwwKrLdCkgVAIaa/fQJYFlCEqiU=", + "owner": "nixos", + "repo": "nixpkgs", + "rev": "b024ced1aac25639f8ca8fdfc2f8c4fbd66c48ef", + "type": "github" + }, + "original": { + "owner": "nixos", + "ref": "nixos-unstable", + "repo": "nixpkgs", + "type": "github" + } + }, + "nixpkgs-lib": { + "locked": { + "lastModified": 1743296961, + "narHash": "sha256-b1EdN3cULCqtorQ4QeWgLMrd5ZGOjLSLemfa00heasc=", + "owner": "nix-community", + "repo": "nixpkgs.lib", + "rev": "e4822aea2a6d1cdd36653c134cacfd64c97ff4fa", + "type": "github" + }, + "original": { + "owner": "nix-community", + "repo": "nixpkgs.lib", + "type": "github" + } + }, + "nixpkgs_2": { + "locked": { + "lastModified": 1744536153, + "narHash": "sha256-awS2zRgF4uTwrOKwwiJcByDzDOdo3Q1rPZbiHQg/N38=", + "owner": "NixOS", + "repo": "nixpkgs", + "rev": "18dd725c29603f582cf1900e0d25f9f1063dbf11", + "type": "github" + }, + "original": { + "owner": "NixOS", + "ref": "nixpkgs-unstable", + "repo": "nixpkgs", + "type": "github" + } + }, + "root": { + "inputs": { + "flake-parts": "flake-parts", + "nixpkgs": "nixpkgs", + "rust-overlay": "rust-overlay" + } + }, + "rust-overlay": { + "inputs": { + "nixpkgs": "nixpkgs_2" + }, + "locked": { + "lastModified": 1745289264, + "narHash": "sha256-7nt+UJ7qaIUe2J7BdnEEph9n2eKEwxUwKS/QIr091uA=", + "owner": "oxalica", + "repo": "rust-overlay", + "rev": "3b7171858c20d5293360042936058fb0c4cb93a9", + "type": "github" + }, + "original": { + "owner": "oxalica", + "repo": "rust-overlay", + "type": "github" + } + } + }, + "root": "root", + "version": 7 +} diff --git a/rs/cli/flake.nix b/rs/cli/flake.nix new file mode 100644 index 0000000..f962167 --- /dev/null +++ b/rs/cli/flake.nix @@ -0,0 +1,59 @@ +{ + inputs = { + nixpkgs.url = "github:nixos/nixpkgs/nixos-unstable"; + flake-parts.url = "github:hercules-ci/flake-parts"; + rust-overlay.url = "github:oxalica/rust-overlay"; + }; + + outputs = inputs: + inputs.flake-parts.lib.mkFlake { inherit inputs; } { + systems = [ "x86_64-linux" ]; + perSystem = { config, self', pkgs, lib, system, ... }: + let + runtimeDeps = with pkgs; [ alsa-lib speechd ]; + buildDeps = with pkgs; [ pkg-config rustPlatform.bindgenHook ]; + devDeps = with pkgs; [ gdb ]; + + cargoToml = builtins.fromTOML (builtins.readFile ./Cargo.toml); + msrv = cargoToml.package.rust-version; + + rustPackage = features: + (pkgs.makeRustPlatform { + cargo = pkgs.rust-bin.stable.latest.minimal; + rustc = pkgs.rust-bin.stable.latest.minimal; + }).buildRustPackage { + inherit (cargoToml.package) name version; + src = ./.; + cargoLock.lockFile = ./Cargo.lock; + buildFeatures = features; + buildInputs = runtimeDeps; + nativeBuildInputs = buildDeps; + # Uncomment if your cargo tests require networking or otherwise + # don't play nicely with the Nix build sandbox: + # doCheck = false; + }; + + mkDevShell = rustc: + pkgs.mkShell { + shellHook = '' + export RUST_SRC_PATH=${pkgs.rustPlatform.rustLibSrc} + ''; + buildInputs = runtimeDeps; + nativeBuildInputs = buildDeps ++ devDeps ++ [ rustc ]; + }; + in { + _module.args.pkgs = import inputs.nixpkgs { + inherit system; + overlays = [ (import inputs.rust-overlay) ]; + }; + + packages.default = self'.packages.cli; + devShells.default = self'.devShells.nightly; + + devShells.nightly = (mkDevShell (pkgs.rust-bin.selectLatestNightlyWith + (toolchain: toolchain.default))); + devShells.stable = (mkDevShell pkgs.rust-bin.stable.latest.default); + devShells.msrv = (mkDevShell pkgs.rust-bin.stable.${msrv}.default); + }; + }; +} diff --git a/rs/cli/src/main.rs b/rs/cli/src/main.rs index 7af2f4c..fc3c3d9 100644 --- a/rs/cli/src/main.rs +++ b/rs/cli/src/main.rs @@ -1,25 +1,104 @@ use elem::engine; -use std::sync::{Arc, Mutex}; -use std::{env, io::Error}; - +use clap::Parser; +use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; +use cpal::{BuildStreamError, DeviceNameError, DevicesError, PlayStreamError}; use futures_util::{SinkExt, StreamExt, TryStreamExt}; -use log::info; +use ringbuf::{traits::*, HeapRb}; +use std::sync::{Arc, Mutex}; +use thiserror::Error; use tokio::net::{TcpListener, TcpStream}; +use tracing::{debug, error, info}; +use tracing_subscriber::{filter::LevelFilter, fmt, prelude::*}; -use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; +#[derive(Parser, Debug)] +#[command(name = "elemcli", version, about, long_about = None)] +struct Args { + /// Lists all available audio devices + #[arg(long)] + list_devices: bool, + + /// Specifies the input audio device + #[arg(short, long, value_name = "INPUT_DEVICE")] + input: Option, + + /// Specifies the output audio device + #[arg(short, long, value_name = "OUTPUT_DEVICE")] + output: Option, + + /// Address at which to run the websocket + #[arg(short, long, default_value = "127.0.0.1:8080")] + addr: String, +} + +#[derive(Error, Debug)] +pub enum ElementaryCliError { + #[error("No input device available")] + NoInputDevice, + #[error("No output device available")] + NoOutputDevice, + #[error(transparent)] + ThreadError(#[from] ThreadError), + #[error("Could not construct device stream: {0}")] + DeviceStreamConstructionFailed(#[from] BuildStreamError), + #[error("Could not play device stream: {0}")] + DeviceStreamPlayFailed(#[from] PlayStreamError), + #[error(transparent)] + DeviceNameError(#[from] DeviceNameError), + #[error(transparent)] + DevicesEnumerationError(#[from] DevicesError), +} + +#[derive(Error, Debug)] +pub enum ThreadError { + #[error("Event loop error: {0}")] + EventLoop(String), + #[error("Event poller error: {0}")] + EventPoller(String), + #[error("TCP listener error: {0}")] + TCPListener(String), +} + +fn main() -> Result<(), ElementaryCliError> { + // User-facing logs at INFO layer + let info_layer = fmt::layer() + .without_time() // Remove the timestamp + .with_target(false) // Remove the target/module path + .with_level(false) // Remove the log level + .with_filter(LevelFilter::INFO); + + // Other logs + let debug_layer = fmt::layer() + .with_target(true) // Include the target/module path + .with_level(true) // Include the log level + .with_filter(LevelFilter::WARN) + .with_filter(LevelFilter::ERROR) + .with_filter(LevelFilter::DEBUG); + + tracing_subscriber::registry() + .with(info_layer) + .with(debug_layer) + .init(); -fn main() { - let _ = env_logger::try_init(); - let addr = env::args() - .nth(1) - .unwrap_or_else(|| "127.0.0.1:8080".to_string()); + let args = Args::parse(); - // Start the audio device + // Config parsing: input device, output device, bitrate, etc let host = cpal::default_host(); - let output_device = host - .default_output_device() - .expect("no output device available"); + + let output_device = if let Some(output_device_name) = args.output { + todo!("use this output instead of the default") + } else { + host.default_output_device() + .ok_or(ElementaryCliError::NoOutputDevice)? + }; + + let input_device = if let Some(input_device_name) = args.input { + todo!("use this input instead of the default") + } else { + host.default_input_device() + .ok_or(ElementaryCliError::NoInputDevice)? + }; + let mut supported_configs_range = output_device .supported_output_configs() .expect("error while querying configs"); @@ -28,49 +107,145 @@ fn main() { .expect("no supported config?!") .with_max_sample_rate(); - // Start the Elem engine + let config: cpal::StreamConfig = supported_config.into(); + + // Different commands + if args.list_devices { + info!("Listing all devices..."); + fn supports_output(device: &D) -> bool { + device + .supported_output_configs() + .map(|mut iter| iter.next().is_some()) + .unwrap_or(false) + } + + fn supports_input(device: &D) -> bool { + device + .supported_input_configs() + .map(|mut iter| iter.next().is_some()) + .unwrap_or(false) + } + + for device in host.devices()? { + let is_input = supports_input(&device); + let is_output = supports_output(&device); + let n = device.name()?; + match (is_input, is_output) { + (true, true) => info!("(in/out) {n}"), + (true, false) => info!("(in) {n}"), + (false, true) => info!("(out) {n}"), + (false, false) => (), // supports neither input nor output, so we don't show + } + } + + return Ok(()); + } + + // Establish a ring buffer to pump data from input to output The delay (implemented via the + // ring buffer) acts as a safety margin to absorb timing mismatches between the input and + // output streams. This ensures that there is always enough data in the buffer for the output + // stream to consume, even if the input and output streams are slightly out of sync. This + // prevents underflow (buffer running out of data) or overflow (buffer filling up too quickly), + // both of which can cause audible artifacts. + let latency_ms: f32 = 1000.0; + let latency_frames = (latency_ms / 1_000.0) * config.sample_rate.0 as f32; + let in_out_ring = HeapRb::new(latency_frames as usize); + + let (mut producer, mut consumer) = in_out_ring.split(); + + // Start the Elem engine and derive handles to it let (engine_main, engine_proc) = engine::new_engine(44100.0, 512); - // Hook up Elem engine with the device - let config: cpal::StreamConfig = supported_config.into(); - let _stream = output_device.build_output_stream( - &config, - move |data: &mut [f32], _| { - let num_channels = config.channels as usize; - for samples in data.chunks_mut(num_channels) { - engine_proc.process( - samples.as_ptr(), - samples.as_mut_ptr(), - num_channels, - samples.len(), - std::ptr::null_mut::<()>(), - ); + // Closure that indicates what to do when we get data on the default audio input. + // In our case, we push it to the `in_out_ring`, which is a ring buffer. + // If we can't push data to the ring buffer, then something's gone wrong + let input_data_fn = move |data: &[f32], _: &cpal::InputCallbackInfo| { + let mut output_fell_behind = false; + for &sample in data { + if producer.try_push(sample).is_err() { + output_fell_behind = true; } - }, - move |err| {}, - None, // None=blocking, Some(Duration)=timeout - ); + } + if output_fell_behind { + error!("Output stream fell behind: try increasing latency"); + // TODO: should we do something more substantial here? + } + }; + + // Closure periodically invoked by the default audio output. + // Whatever data we write to the `data` buffer will be 'shipped' to the output device. + let output_data_fn = move |data: &mut [f32], _: &cpal::OutputCallbackInfo| { + let num_channels = config.channels as usize; + // TODO: how should I think about getting input data from the ring buffer here? + // should I pass it to the `input_data` in `engine_proc`? + // + // let mut input_fell_behind = None; + // + // for sample in data { + // *sample = match consumer.pop() { + // Ok(s) => s, + // Err(err) => { + // input_fell_behind = Some(err); + // 0.0 + // } + // }; + // } + // if let Some(err) = input_fell_behind { + // eprintln!( + // "input stream fell behind: {:?}: try increasing latency", + // err + // ); + // } + for samples in data.chunks_mut(num_channels) { + engine_proc.process( + samples.as_ptr(), + samples.as_mut_ptr(), + num_channels, + samples.len(), + std::ptr::null_mut::<()>(), + ); + } + }; + + let err_fn = move |_err| {}; + + // Hook up Elem engine with the output device + let input_stream = input_device.build_input_stream(&config, input_data_fn, err_fn, None)?; + info!("Input stream established."); + let output_stream = output_device.build_output_stream(&config, output_data_fn, err_fn, None)?; + info!("Output stream established."); + + // Necessary as streams will not automatically play on some platforms + input_stream.play()?; + output_stream.play()?; tokio::runtime::Builder::new_multi_thread() .enable_all() .build() - .unwrap() - .block_on(run_event_loop_main(addr, engine_main)) - .expect("Failed to start event loop") + .map_err(|e| ThreadError::EventLoop(e.to_string()))? + .block_on(run_event_loop_main(args.addr, engine_main)) + .map_err(|e| e.into()) } -async fn run_event_loop_main(addr: String, engine_main: engine::MainHandle) -> Result<(), Error> { +async fn run_event_loop_main( + addr: String, + engine_main: engine::MainHandle, +) -> Result<(), ThreadError> { let shared_engine_main = Arc::new(Mutex::new(engine_main)); - let (first, second) = tokio::join!( + // If either of the threads fails, we stop the program + let res = tokio::try_join!( tokio::spawn(run_event_poller(shared_engine_main.clone())), tokio::spawn(run_tcp_listener(addr, shared_engine_main.clone())), ); - first.unwrap_or(second.unwrap_or(Ok(()))) + match res { + Ok((first, second)) => first.and(second), + Err(e) => unreachable!("One of the event poller or TCP listener threads panicked... should always return an error? {}", e), + } } -async fn run_event_poller(engine_main: Arc>) -> Result<(), Error> { +async fn run_event_poller(engine_main: Arc>) -> Result<(), ThreadError> { let mut interval = tokio::time::interval(tokio::time::Duration::from_millis((1000.0 / 30.0) as u64)); @@ -90,7 +265,7 @@ async fn run_event_poller(engine_main: Arc>) -> Result async fn run_tcp_listener( addr: String, engine_main: Arc>, -) -> Result<(), Error> { +) -> Result<(), ThreadError> { // Create the TCP listener we'll accept connections on let try_socket = TcpListener::bind(&addr).await; let listener = try_socket.expect("Failed to bind"); diff --git a/rs/elem/src/engine.rs b/rs/elem/src/engine.rs index dfed55b..fd855c1 100644 --- a/rs/elem/src/engine.rs +++ b/rs/elem/src/engine.rs @@ -2,7 +2,7 @@ use crate::node::{NodeRepr, ShallowNodeRepr}; use crate::reconcile::reconcile; use crate::std::prelude::*; use std::cell::UnsafeCell; -use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; +use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; pub trait FloatType: 'static {} @@ -72,7 +72,7 @@ unsafe impl Send for EngineInternal {} unsafe impl Sync for EngineInternal {} impl EngineInternal { - pub fn add_shared_resource( + fn add_shared_resource( &self, name: &String, channels: usize, @@ -90,7 +90,7 @@ impl EngineInternal { } } - pub fn apply_instructions(&self, instructions: &serde_json::Value) -> Result { + fn apply_instructions(&self, instructions: &serde_json::Value) -> Result { unsafe { let result = self .inner @@ -105,7 +105,7 @@ impl EngineInternal { } } - pub fn process_queued_events(&self) -> Result { + fn process_queued_events(&self) -> Result { unsafe { let batch = self .inner @@ -126,7 +126,7 @@ pub struct ProcessHandle { } impl ProcessHandle { - pub fn new(inner: Arc) -> Self { + fn new(inner: Arc) -> Self { Self { inner } } @@ -163,7 +163,7 @@ pub struct MainHandle { } impl MainHandle { - pub fn new(inner: Arc) -> Self { + fn new(inner: Arc) -> Self { Self { inner, node_map: BTreeMap::new(), diff --git a/rs/elem/src/lib.rs b/rs/elem/src/lib.rs index 8320d25..01644d9 100644 --- a/rs/elem/src/lib.rs +++ b/rs/elem/src/lib.rs @@ -3,18 +3,3 @@ mod reconcile; pub mod engine; pub mod node; pub mod std; - -pub fn add(left: u64, right: u64) -> u64 { - left + right -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn it_works() { - let result = add(2, 2); - assert_eq!(result, 4); - } -}