Skip to content

Commit 4dbc008

Browse files
Zoxccuviper
authored andcommitted
Add callbacks for when threads start and stop doing work
1 parent 3b72c1e commit 4dbc008

File tree

4 files changed

+102
-14
lines changed

4 files changed

+102
-14
lines changed

rayon-core/src/lib.rs

+58-1
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,12 @@ pub struct ThreadPoolBuilder<S = DefaultSpawn> {
159159
/// Closure invoked to spawn threads.
160160
spawn_handler: S,
161161

162+
/// Closure invoked when starting computations in a thread.
163+
acquire_thread_handler: Option<Box<AcquireThreadHandler>>,
164+
165+
/// Closure invoked when blocking in a thread.
166+
release_thread_handler: Option<Box<ReleaseThreadHandler>>,
167+
162168
/// If false, worker threads will execute spawned jobs in a
163169
/// "depth-first" fashion. If true, they will do a "breadth-first"
164170
/// fashion. Depth-first is the default.
@@ -201,12 +207,22 @@ impl Default for ThreadPoolBuilder {
201207
start_handler: None,
202208
exit_handler: None,
203209
deadlock_handler: None,
210+
acquire_thread_handler: None,
211+
release_thread_handler: None,
204212
spawn_handler: DefaultSpawn,
205213
breadth_first: false,
206214
}
207215
}
208216
}
209217

218+
/// The type for a closure that gets invoked before starting computations in a thread.
219+
/// Note that this same closure may be invoked multiple times in parallel.
220+
type AcquireThreadHandler = dyn Fn() + Send + Sync;
221+
222+
/// The type for a closure that gets invoked before blocking in a thread.
223+
/// Note that this same closure may be invoked multiple times in parallel.
224+
type ReleaseThreadHandler = dyn Fn() + Send + Sync;
225+
210226
impl ThreadPoolBuilder {
211227
/// Creates and returns a valid rayon thread pool builder, but does not initialize it.
212228
pub fn new() -> Self {
@@ -309,7 +325,12 @@ impl ThreadPoolBuilder {
309325
Ok(())
310326
})
311327
.build()?;
312-
Ok(with_pool(&pool))
328+
let result = unwind::halt_unwinding(|| with_pool(&pool));
329+
pool.wait_until_stopped();
330+
match result {
331+
Ok(result) => Ok(result),
332+
Err(err) => unwind::resume_unwinding(err),
333+
}
313334
});
314335

315336
match result {
@@ -388,6 +409,8 @@ impl<S> ThreadPoolBuilder<S> {
388409
start_handler: self.start_handler,
389410
exit_handler: self.exit_handler,
390411
deadlock_handler: self.deadlock_handler,
412+
acquire_thread_handler: self.acquire_thread_handler,
413+
release_thread_handler: self.release_thread_handler,
391414
breadth_first: self.breadth_first,
392415
}
393416
}
@@ -546,6 +569,34 @@ impl<S> ThreadPoolBuilder<S> {
546569
self.breadth_first
547570
}
548571

572+
/// Takes the current acquire thread callback, leaving `None`.
573+
fn take_acquire_thread_handler(&mut self) -> Option<Box<AcquireThreadHandler>> {
574+
self.acquire_thread_handler.take()
575+
}
576+
577+
/// Set a callback to be invoked when starting computations in a thread.
578+
pub fn acquire_thread_handler<H>(mut self, acquire_thread_handler: H) -> Self
579+
where
580+
H: Fn() + Send + Sync + 'static,
581+
{
582+
self.acquire_thread_handler = Some(Box::new(acquire_thread_handler));
583+
self
584+
}
585+
586+
/// Takes the current release thread callback, leaving `None`.
587+
fn take_release_thread_handler(&mut self) -> Option<Box<ReleaseThreadHandler>> {
588+
self.release_thread_handler.take()
589+
}
590+
591+
/// Set a callback to be invoked when blocking in thread.
592+
pub fn release_thread_handler<H>(mut self, release_thread_handler: H) -> Self
593+
where
594+
H: Fn() + Send + Sync + 'static,
595+
{
596+
self.release_thread_handler = Some(Box::new(release_thread_handler));
597+
self
598+
}
599+
549600
/// Takes the current deadlock callback, leaving `None`.
550601
fn take_deadlock_handler(&mut self) -> Option<Box<DeadlockHandler>> {
551602
self.deadlock_handler.take()
@@ -716,6 +767,8 @@ impl<S> fmt::Debug for ThreadPoolBuilder<S> {
716767
ref deadlock_handler,
717768
ref start_handler,
718769
ref exit_handler,
770+
ref acquire_thread_handler,
771+
ref release_thread_handler,
719772
spawn_handler: _,
720773
ref breadth_first,
721774
} = *self;
@@ -733,6 +786,8 @@ impl<S> fmt::Debug for ThreadPoolBuilder<S> {
733786
let deadlock_handler = deadlock_handler.as_ref().map(|_| ClosurePlaceholder);
734787
let start_handler = start_handler.as_ref().map(|_| ClosurePlaceholder);
735788
let exit_handler = exit_handler.as_ref().map(|_| ClosurePlaceholder);
789+
let acquire_thread_handler = acquire_thread_handler.as_ref().map(|_| ClosurePlaceholder);
790+
let release_thread_handler = release_thread_handler.as_ref().map(|_| ClosurePlaceholder);
736791

737792
f.debug_struct("ThreadPoolBuilder")
738793
.field("num_threads", num_threads)
@@ -742,6 +797,8 @@ impl<S> fmt::Debug for ThreadPoolBuilder<S> {
742797
.field("deadlock_handler", &deadlock_handler)
743798
.field("start_handler", &start_handler)
744799
.field("exit_handler", &exit_handler)
800+
.field("acquire_thread_handler", &acquire_thread_handler)
801+
.field("release_thread_handler", &release_thread_handler)
745802
.field("breadth_first", &breadth_first)
746803
.finish()
747804
}

rayon-core/src/registry.rs

+30-9
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ use std::usize;
2525
use unwind;
2626
use util::leak;
2727
use {
28-
DeadlockHandler, ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError,
29-
ThreadPoolBuilder,
28+
AcquireThreadHandler, DeadlockHandler, ErrorKind, ExitHandler, PanicHandler,
29+
ReleaseThreadHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder,
3030
};
3131

3232
/// Thread builder used for customization via
@@ -141,9 +141,11 @@ pub struct Registry {
141141
sleep: Sleep,
142142
injected_jobs: SegQueue<JobRef>,
143143
panic_handler: Option<Box<PanicHandler>>,
144-
deadlock_handler: Option<Box<DeadlockHandler>>,
144+
pub(crate) deadlock_handler: Option<Box<DeadlockHandler>>,
145145
start_handler: Option<Box<StartHandler>>,
146146
exit_handler: Option<Box<ExitHandler>>,
147+
pub(crate) acquire_thread_handler: Option<Box<AcquireThreadHandler>>,
148+
pub(crate) release_thread_handler: Option<Box<ReleaseThreadHandler>>,
147149

148150
// When this latch reaches 0, it means that all work on this
149151
// registry must be complete. This is ensured in the following ways:
@@ -248,6 +250,8 @@ impl Registry {
248250
deadlock_handler: builder.take_deadlock_handler(),
249251
start_handler: builder.take_start_handler(),
250252
exit_handler: builder.take_exit_handler(),
253+
acquire_thread_handler: builder.take_acquire_thread_handler(),
254+
release_thread_handler: builder.take_release_thread_handler(),
251255
});
252256

253257
// If we return early or panic, make sure to terminate existing threads.
@@ -355,11 +359,24 @@ impl Registry {
355359

356360
/// Waits for the worker threads to stop. This is used for testing
357361
/// -- so we can check that termination actually works.
358-
#[cfg(test)]
359362
pub(super) fn wait_until_stopped(&self) {
363+
self.release_thread();
360364
for info in &self.thread_infos {
361365
info.stopped.wait();
362366
}
367+
self.acquire_thread();
368+
}
369+
370+
pub(crate) fn acquire_thread(&self) {
371+
if let Some(ref acquire_thread_handler) = self.acquire_thread_handler {
372+
acquire_thread_handler();
373+
}
374+
}
375+
376+
pub(crate) fn release_thread(&self) {
377+
if let Some(ref release_thread_handler) = self.release_thread_handler {
378+
release_thread_handler();
379+
}
363380
}
364381

365382
/// ////////////////////////////////////////////////////////////////////////
@@ -509,7 +526,9 @@ impl Registry {
509526
l,
510527
);
511528
self.inject(&[job.as_job_ref()]);
529+
self.release_thread();
512530
job.latch.wait_and_reset(); // Make sure we can use the same latch again next time.
531+
self.acquire_thread();
513532
job.into_result()
514533
})
515534
}
@@ -743,11 +762,10 @@ impl WorkerThread {
743762
yields = self.registry.sleep.work_found(self.index, yields);
744763
self.execute(job);
745764
} else {
746-
yields = self.registry.sleep.no_work_found(
747-
self.index,
748-
yields,
749-
&self.registry.deadlock_handler,
750-
);
765+
yields = self
766+
.registry
767+
.sleep
768+
.no_work_found(self.index, yields, &self.registry);
751769
}
752770
}
753771

@@ -839,6 +857,7 @@ unsafe fn main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usiz
839857
}
840858
}
841859

860+
registry.acquire_thread();
842861
worker_thread.wait_until(&registry.terminate_latch);
843862

844863
// Should not be any work left in our queue.
@@ -861,6 +880,8 @@ unsafe fn main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usiz
861880
}
862881
// We're already exiting the thread, there's nothing else to do.
863882
}
883+
884+
registry.release_thread();
864885
}
865886

866887
/// If already in a worker-thread, just execute `op`. Otherwise,

rayon-core/src/sleep/mod.rs

+8-4
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
//! for an overview.
33
44
use log::Event::*;
5+
use registry::Registry;
56
use std::sync::atomic::{AtomicUsize, Ordering};
67
use std::sync::{Condvar, Mutex};
78
use std::thread;
@@ -117,7 +118,7 @@ impl Sleep {
117118
&self,
118119
worker_index: usize,
119120
yields: usize,
120-
deadlock_handler: &Option<Box<DeadlockHandler>>,
121+
registry: &Registry,
121122
) -> usize {
122123
log!(DidNotFindWork {
123124
worker: worker_index,
@@ -145,7 +146,7 @@ impl Sleep {
145146
}
146147
} else {
147148
debug_assert_eq!(yields, ROUNDS_UNTIL_ASLEEP);
148-
self.sleep(worker_index, deadlock_handler);
149+
self.sleep(worker_index, registry);
149150
0
150151
}
151152
}
@@ -248,7 +249,7 @@ impl Sleep {
248249
self.worker_is_sleepy(state, worker_index)
249250
}
250251

251-
fn sleep(&self, worker_index: usize, deadlock_handler: &Option<Box<DeadlockHandler>>) {
252+
fn sleep(&self, worker_index: usize, registry: &Registry) {
252253
loop {
253254
// Acquire here suffices. If we observe that the current worker is still
254255
// sleepy, then in fact we know that no writes have occurred, and anyhow
@@ -327,12 +328,15 @@ impl Sleep {
327328

328329
// Decrement the number of active threads and check for a deadlock
329330
data.active_threads -= 1;
330-
data.deadlock_check(deadlock_handler);
331+
data.deadlock_check(&registry.deadlock_handler);
332+
333+
registry.release_thread();
331334

332335
let _ = self.tickle.wait(data).unwrap();
333336
log!(GotAwoken {
334337
worker: worker_index
335338
});
339+
registry.acquire_thread();
336340
return;
337341
}
338342
} else {

rayon-core/src/thread_pool/mod.rs

+6
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,12 @@ impl ThreadPool {
274274
// We assert that `self.registry` has not terminated.
275275
unsafe { spawn::spawn_fifo_in(op, &self.registry) }
276276
}
277+
278+
pub(crate) fn wait_until_stopped(self) {
279+
let registry = self.registry.clone();
280+
drop(self);
281+
registry.wait_until_stopped();
282+
}
277283
}
278284

279285
impl Drop for ThreadPool {

0 commit comments

Comments
 (0)