diff --git a/Cargo.lock b/Cargo.lock index be9502c..11c2a27 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2619,6 +2619,7 @@ dependencies = [ "once_cell", "opendal", "pretty_env_logger", + "rayon", "serde", "serde_json", "serde_yaml", diff --git a/Cargo.toml b/Cargo.toml index 959576c..cec613a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,10 +4,6 @@ version = "0.1.0" edition = "2021" authors = ["Tony Solomonik @ tony.solomonik@gmail.com"] -# https://github.com/eyre-rs/color-eyre#improving-perf-on-debug-builds -[profile.dev.package.backtrace] -opt-level = 3 - [profile.release] strip = true lto = true @@ -26,6 +22,7 @@ log = "0.4.21" once_cell = "1.19.0" opendal = { version = "0.46.0", features = ["services-fs"] } pretty_env_logger = "0.5.0" +rayon = "1.10.0" serde = { version = "1.0.201", features = ["derive", "rc"] } serde_json = "1.0.117" serde_yaml = "0.9.34" diff --git a/src/commands/search.rs b/src/commands/search.rs index 32ece26..ff9514a 100644 --- a/src/commands/search.rs +++ b/src/commands/search.rs @@ -1,7 +1,8 @@ -use std::collections::BTreeMap; +use std::{backtrace::Backtrace, collections::BTreeMap}; use color_eyre::{eyre::eyre, Result}; -use futures::future::{try_join, try_join_all}; +use futures::future::{select, select_all, Either}; +use rayon::{ThreadPool, ThreadPoolBuilder}; use sqlx::PgPool; use tantivy::{ collector::TopDocs, @@ -9,7 +10,10 @@ use tantivy::{ schema::{OwnedValue, Schema}, Document, Index, ReloadPolicy, TantivyDocument, }; -use tokio::{spawn, sync::mpsc::channel, task::spawn_blocking}; +use tokio::{ + spawn, + sync::{mpsc, oneshot}, +}; use crate::{ args::SearchArgs, @@ -60,11 +64,42 @@ fn get_prettified_json( Ok(serde_json::to_string(&prettified_field_map)?) } +fn run_on_thread_pool(thread_pool: &ThreadPool, task: F) -> oneshot::Receiver +where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, +{ + let (tx, rx) = oneshot::channel(); + thread_pool.spawn(move || { + if tx.is_closed() { + return; + } + let _ = tx.send(task()); + }); + rx +} + pub async fn run_search(args: SearchArgs, pool: PgPool) -> Result<()> { if args.limit == 0 { return Ok(()); } + let thread_pool = ThreadPoolBuilder::new() + .thread_name(|id| format!("search-{id}")) + .panic_handler(|info| { + let msg = if let Some(s) = info.downcast_ref::<&str>() { + format!("Thread panic: {:?}", s) + } else if let Some(s) = info.downcast_ref::() { + format!("Thread panic: {:?}", s) + } else { + "?".to_string() + }; + + error!("task in search thread pool panicked: {}", msg); + eprintln!("Backtrace: {}", Backtrace::capture()); + }) + .build()?; + let config = get_index_config(&args.name, &pool).await?; let indexed_field_names = { @@ -83,17 +118,15 @@ pub async fn run_search(args: SearchArgs, pool: PgPool) -> Result<()> { .map(|(_, x)| x) .collect::>(); - let (tx, mut rx) = channel(args.limit); let mut tx_handles = Vec::with_capacity(directories.len()); + let (tx, mut rx) = mpsc::channel(args.limit); - // Should be chunked to never starve the thread pool (default in tokio is 500 threads). for directory in directories { let tx = tx.clone(); let query = args.query.clone(); let indexed_field_names = indexed_field_names.clone(); - // Should use rayon if search ends up being cpu bound (it seems io bound). - tx_handles.push(spawn_blocking(move || -> Result<()> { + tx_handles.push(run_on_thread_pool(&thread_pool, move || -> Result<()> { if tx.is_closed() { return Ok(()); } @@ -132,7 +165,7 @@ pub async fn run_search(args: SearchArgs, pool: PgPool) -> Result<()> { })); } - let rx_handle = spawn(async move { + let mut rx_handle = spawn(async move { let mut i = 0; while let Some(doc) = rx.recv().await { println!("{}", doc); @@ -144,7 +177,23 @@ pub async fn run_search(args: SearchArgs, pool: PgPool) -> Result<()> { rx.close(); }); - try_join(try_join_all(tx_handles), rx_handle).await?; + loop { + match select(&mut rx_handle, select_all(&mut tx_handles)).await { + Either::Left(..) => { + break; + } + Either::Right(((result, i, _), _)) => { + if let Ok(Err(e)) = result { + error!("error in search task: {}", e); + } + + tx_handles.remove(i); + if tx_handles.is_empty() { + break; + } + } + }; + } Ok(()) }