Skip to content

Commit 4f52312

Browse files
ezrosentEli Rosenthal
and
Eli Rosenthal
authored
Fix hanging core-relations tests. (#28)
* Fix deadlock condition and add mimalloc * remove a different deadlock source --------- Co-authored-by: Eli Rosenthal <[email protected]>
1 parent e1c938d commit 4f52312

File tree

4 files changed

+75
-22
lines changed

4 files changed

+75
-22
lines changed

core-relations/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ dyn-clone = "1.0.17"
3131
[dev-dependencies]
3232
once_cell = "1.20.2"
3333
divan = "0.1.15"
34+
mimalloc = "0.1"
3435

3536
[[bench]]
3637
name = "writes"

core-relations/src/free_join/mod.rs

+24-16
Original file line numberDiff line numberDiff line change
@@ -647,36 +647,44 @@ impl Drop for Database {
647647
/// This is in a separate function to allow us to reuse it while already
648648
/// borrowing a `TableInfo`.
649649
fn get_index_from_tableinfo(table_info: &TableInfo, cols: &[ColumnId]) -> HashIndex {
650-
let guard = table_info.indexes.entry(cols.into()).or_insert_with(|| {
651-
Arc::new(ReadOptimizedLock::new(Index::new(
652-
cols.to_vec(),
653-
TupleIndex::new(cols.len()),
654-
)))
655-
});
656-
let ix = guard.value().read();
650+
let index: Arc<_> = table_info
651+
.indexes
652+
.entry(cols.into())
653+
.or_insert_with(|| {
654+
Arc::new(ReadOptimizedLock::new(Index::new(
655+
cols.to_vec(),
656+
TupleIndex::new(cols.len()),
657+
)))
658+
})
659+
.clone();
660+
let ix = index.read();
657661
if ix.needs_refresh(table_info.table.as_ref()) {
658662
mem::drop(ix);
659-
let mut ix = guard.value().lock();
663+
let mut ix = index.lock();
660664
ix.refresh(table_info.table.as_ref());
661665
}
662-
guard.value().clone()
666+
index
663667
}
664668

665669
/// The core logic behind getting and updating a column index.
666670
///
667671
/// This is the single-column analog to [`get_index_from_tableinfo`].
668672
fn get_column_index_from_tableinfo(table_info: &TableInfo, col: ColumnId) -> HashColumnIndex {
669-
let index = table_info.column_indexes.entry(col).or_insert_with(|| {
670-
Arc::new(ReadOptimizedLock::new(Index::new(
671-
vec![col],
672-
ColumnIndex::new(),
673-
)))
674-
});
673+
let index: Arc<_> = table_info
674+
.column_indexes
675+
.entry(col)
676+
.or_insert_with(|| {
677+
Arc::new(ReadOptimizedLock::new(Index::new(
678+
vec![col],
679+
ColumnIndex::new(),
680+
)))
681+
})
682+
.clone();
675683
let ix = index.read();
676684
if ix.needs_refresh(table_info.table.as_ref()) {
677685
mem::drop(ix);
678686
let mut ix = index.lock();
679687
ix.refresh(table_info.table.as_ref());
680688
}
681-
index.clone()
689+
index
682690
}

core-relations/src/hash_index/mod.rs

+45-6
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@
22
use std::{
33
hash::{Hash, Hasher},
44
mem,
5-
sync::Mutex,
5+
sync::{Arc, Mutex},
66
};
77

8+
use concurrency::Notification;
89
use hashbrown::HashTable;
910
use numeric_id::{define_id, IdVec, NumericId};
1011
use once_cell::sync::Lazy;
@@ -260,22 +261,23 @@ impl IndexBase for ColumnIndex {
260261
}
261262
};
262263

263-
THREAD_POOL.install(|| {
264-
rayon::scope(|scope| {
264+
run_in_thread_pool_and_block(&THREAD_POOL, || {
265+
rayon::in_place_scope(|inner| {
265266
let mut cur = Offset::new(0);
266267
loop {
267268
let mut buf = TaggedRowBuffer::new(cols.len());
268269
if let Some(next) =
269270
table.scan_project(subset, cols, cur, BATCH_SIZE, &[], &mut buf)
270271
{
271272
cur = next;
272-
scope.spawn(move |_| split_buf(buf));
273+
inner.spawn(move |_| split_buf(buf));
273274
} else {
274-
scope.spawn(move |_| split_buf(buf));
275+
inner.spawn(move |_| split_buf(buf));
275276
break;
276277
}
277278
}
278279
});
280+
279281
self.shards.par_iter_mut().for_each(|(shard_id, shard)| {
280282
use indexmap::map::Entry;
281283
// Sort the vector by start row id to ensure we populate subsets in sorted order.
@@ -302,6 +304,43 @@ impl IndexBase for ColumnIndex {
302304
}
303305
}
304306

307+
/// This function is an alternative for [`rayon::ThreadPool::install`] that doesn't steal work from
308+
/// the callee's current thread pool while waiting for `f` to finish.
309+
///
310+
/// We do this to avoid deadlocks. The whole purpose of using a separate threadpool in this module
311+
/// is to allow for sufficient parallelism while holding a lock on the main threadpool. That means
312+
/// we are not worried about an outer lock tying up a thread in the main pool.
313+
///
314+
/// On the other hand, it _is_ a bad idea to steal work on a rayon thread pool with some locks
315+
/// held. In particular, if another task on the thread pool _itself_ attempts to aquire the same
316+
/// lock, this can cause a deadlock. We saw this in the tests for this crate. The relevant lock
317+
/// are those around individual indexes stored in the database-level index cache.
318+
fn run_in_thread_pool_and_block<'a>(pool: &rayon::ThreadPool, f: impl FnMut() + Send + 'a) {
319+
// NB: We don't need the heap allocations here. But we are only calling this function if
320+
// we are about to do a bunch of work, so clarify is probably going to be better than (even
321+
// more) unsafe code.
322+
323+
// Alright, here we go: pretend `f` has `'static` lifetime because we are passing it to
324+
// `spawn`.
325+
trait LifetimeWork<'a>: FnMut() + Send + 'a {}
326+
327+
impl<'a, F: FnMut() + Send + 'a> LifetimeWork<'a> for F {}
328+
let as_lifetime: Box<dyn LifetimeWork<'a>> = Box::new(f);
329+
let mut casted_away = unsafe {
330+
// SAFETY: `casted_away` will be dropped at the end of this method. The notification used
331+
// below will ensure it does not escape.
332+
mem::transmute::<Box<dyn LifetimeWork<'a>>, Box<dyn LifetimeWork<'static>>>(as_lifetime)
333+
};
334+
let n = Arc::new(Notification::new());
335+
let inner = n.clone();
336+
pool.spawn(move || {
337+
casted_away();
338+
mem::drop(casted_away);
339+
inner.notify();
340+
});
341+
n.wait()
342+
}
343+
305344
impl ColumnIndex {
306345
pub(crate) fn new() -> ColumnIndex {
307346
with_pool_set(|ps| {
@@ -447,7 +486,7 @@ impl IndexBase for TupleIndex {
447486
queues[shard_id].lock().unwrap().push((first, buf));
448487
}
449488
};
450-
THREAD_POOL.install(|| {
489+
run_in_thread_pool_and_block(&THREAD_POOL, || {
451490
rayon::scope(|scope| {
452491
let mut cur = Offset::new(0);
453492
loop {

core-relations/src/tests.rs

+5
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@ use crate::{
1919
PlanStrategy,
2020
};
2121

22+
/// On MacOs the system allocator is vulenrable to contention, causing tests to execute quite
23+
/// slowly without mimalloc.
24+
#[global_allocator]
25+
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
26+
2227
#[test]
2328
fn basic_query() {
2429
let MathEgraph {

0 commit comments

Comments
 (0)