Skip to content

Commit

Permalink
search: Move to rayon thread pool instead of blocking tokio thread pool
Browse files Browse the repository at this point in the history
  • Loading branch information
tontinton committed Jun 2, 2024
1 parent f387d42 commit 48347bc
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 13 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@ version = "0.1.0"
edition = "2021"
authors = ["Tony Solomonik @ [email protected]"]

# https://github.com/eyre-rs/color-eyre#improving-perf-on-debug-builds
[profile.dev.package.backtrace]
opt-level = 3

[profile.release]
strip = true
lto = true
Expand All @@ -26,6 +22,7 @@ log = "0.4.21"
once_cell = "1.19.0"
opendal = { version = "0.46.0", features = ["services-fs"] }
pretty_env_logger = "0.5.0"
rayon = "1.10.0"
serde = { version = "1.0.201", features = ["derive", "rc"] }
serde_json = "1.0.117"
serde_yaml = "0.9.34"
Expand Down
67 changes: 58 additions & 9 deletions src/commands/search.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
use std::collections::BTreeMap;
use std::{backtrace::Backtrace, collections::BTreeMap};

use color_eyre::{eyre::eyre, Result};
use futures::future::{try_join, try_join_all};
use futures::future::{select, select_all, Either};
use rayon::{ThreadPool, ThreadPoolBuilder};
use sqlx::PgPool;
use tantivy::{
collector::TopDocs,
query::QueryParser,
schema::{OwnedValue, Schema},
Document, Index, ReloadPolicy, TantivyDocument,
};
use tokio::{spawn, sync::mpsc::channel, task::spawn_blocking};
use tokio::{
spawn,
sync::{mpsc, oneshot},
};

use crate::{
args::SearchArgs,
Expand Down Expand Up @@ -60,11 +64,42 @@ fn get_prettified_json(
Ok(serde_json::to_string(&prettified_field_map)?)
}

fn run_on_thread_pool<F, R>(thread_pool: &ThreadPool, task: F) -> oneshot::Receiver<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let (tx, rx) = oneshot::channel();
thread_pool.spawn(move || {
if tx.is_closed() {
return;
}
let _ = tx.send(task());
});
rx
}

pub async fn run_search(args: SearchArgs, pool: PgPool) -> Result<()> {
if args.limit == 0 {
return Ok(());
}

let thread_pool = ThreadPoolBuilder::new()
.thread_name(|id| format!("search-{id}"))
.panic_handler(|info| {
let msg = if let Some(s) = info.downcast_ref::<&str>() {
format!("Thread panic: {:?}", s)
} else if let Some(s) = info.downcast_ref::<String>() {
format!("Thread panic: {:?}", s)
} else {
"?".to_string()
};

error!("task in search thread pool panicked: {}", msg);
eprintln!("Backtrace: {}", Backtrace::capture());
})
.build()?;

let config = get_index_config(&args.name, &pool).await?;

let indexed_field_names = {
Expand All @@ -83,17 +118,15 @@ pub async fn run_search(args: SearchArgs, pool: PgPool) -> Result<()> {
.map(|(_, x)| x)
.collect::<Vec<_>>();

let (tx, mut rx) = channel(args.limit);
let mut tx_handles = Vec::with_capacity(directories.len());
let (tx, mut rx) = mpsc::channel(args.limit);

// Should be chunked to never starve the thread pool (default in tokio is 500 threads).
for directory in directories {
let tx = tx.clone();
let query = args.query.clone();
let indexed_field_names = indexed_field_names.clone();

// Should use rayon if search ends up being cpu bound (it seems io bound).
tx_handles.push(spawn_blocking(move || -> Result<()> {
tx_handles.push(run_on_thread_pool(&thread_pool, move || -> Result<()> {
if tx.is_closed() {
return Ok(());
}
Expand Down Expand Up @@ -132,7 +165,7 @@ pub async fn run_search(args: SearchArgs, pool: PgPool) -> Result<()> {
}));
}

let rx_handle = spawn(async move {
let mut rx_handle = spawn(async move {
let mut i = 0;
while let Some(doc) = rx.recv().await {
println!("{}", doc);
Expand All @@ -144,7 +177,23 @@ pub async fn run_search(args: SearchArgs, pool: PgPool) -> Result<()> {
rx.close();
});

try_join(try_join_all(tx_handles), rx_handle).await?;
loop {
match select(&mut rx_handle, select_all(&mut tx_handles)).await {
Either::Left(..) => {
break;
}
Either::Right(((result, i, _), _)) => {
if let Ok(Err(e)) = result {
error!("error in search task: {}", e);
}

tx_handles.remove(i);
if tx_handles.is_empty() {
break;
}
}
};
}

Ok(())
}

0 comments on commit 48347bc

Please sign in to comment.