Skip to content
This repository was archived by the owner on Jun 8, 2021. It is now read-only.

Commit e8a28ad

Browse files
authored
Merge pull request #578 from sdroege/thread-pool
Add glib::ThreadPool bindings
2 parents e003445 + 8aaf44a commit e8a28ad

File tree

2 files changed

+209
-0
lines changed

2 files changed

+209
-0
lines changed

src/lib.rs

+3
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,9 @@ mod main_context_futures;
192192
mod source_futures;
193193
pub use source_futures::*;
194194

195+
mod thread_pool;
196+
pub use thread_pool::ThreadPool;
197+
195198
// Actual thread IDs can be reused by the OS once the old thread finished.
196199
// This works around it by using our own counter for threads.
197200
//

src/thread_pool.rs

+206
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
// Copyright 2020, The Gtk-rs Project Developers.
2+
// See the COPYRIGHT file at the top-level directory of this distribution.
3+
// Licensed under the MIT license, see the LICENSE file or <http://opensource.org/licenses/MIT>
4+
5+
use glib_sys;
6+
use translate::*;
7+
8+
use futures_channel::oneshot;
9+
use std::future::Future;
10+
use std::ptr;
11+
12+
#[derive(Debug)]
13+
pub struct ThreadPool(ptr::NonNull<glib_sys::GThreadPool>);
14+
15+
unsafe impl Send for ThreadPool {}
16+
unsafe impl Sync for ThreadPool {}
17+
18+
impl ThreadPool {
19+
pub fn new_shared(max_threads: Option<u32>) -> Result<Self, ::Error> {
20+
unsafe {
21+
let mut err = ptr::null_mut();
22+
let pool = glib_sys::g_thread_pool_new(
23+
Some(spawn_func),
24+
ptr::null_mut(),
25+
max_threads.map(|v| v as i32).unwrap_or(-1),
26+
glib_sys::GFALSE,
27+
&mut err,
28+
);
29+
if pool.is_null() {
30+
Err(from_glib_full(err))
31+
} else {
32+
Ok(ThreadPool(ptr::NonNull::new_unchecked(pool)))
33+
}
34+
}
35+
}
36+
37+
pub fn new_exclusive(max_threads: u32) -> Result<Self, ::Error> {
38+
unsafe {
39+
let mut err = ptr::null_mut();
40+
let pool = glib_sys::g_thread_pool_new(
41+
Some(spawn_func),
42+
ptr::null_mut(),
43+
max_threads as i32,
44+
glib_sys::GTRUE,
45+
&mut err,
46+
);
47+
if pool.is_null() {
48+
Err(from_glib_full(err))
49+
} else {
50+
Ok(ThreadPool(ptr::NonNull::new_unchecked(pool)))
51+
}
52+
}
53+
}
54+
55+
pub fn push<F: FnOnce() + Send + 'static>(&self, func: F) -> Result<(), ::Error> {
56+
unsafe {
57+
let func: Box<dyn FnOnce() + Send + 'static> = Box::new(func);
58+
let func = Box::new(func);
59+
let mut err = ptr::null_mut();
60+
61+
let func = Box::into_raw(func);
62+
let ret: bool = from_glib(glib_sys::g_thread_pool_push(
63+
self.0.as_ptr(),
64+
func as *mut _,
65+
&mut err,
66+
));
67+
if ret {
68+
Ok(())
69+
} else {
70+
let _ = Box::from_raw(func);
71+
Err(from_glib_full(err))
72+
}
73+
}
74+
}
75+
76+
pub fn push_future<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
77+
&self,
78+
func: F,
79+
) -> Result<impl Future<Output = T>, ::Error> {
80+
use futures_util::future::FutureExt;
81+
82+
let (sender, receiver) = oneshot::channel();
83+
84+
self.push(move || {
85+
let _ = sender.send(func());
86+
})?;
87+
88+
Ok(receiver.map(|res| res.expect("Dropped before executing")))
89+
}
90+
91+
pub fn set_max_threads(&self, max_threads: Option<u32>) -> Result<(), ::Error> {
92+
unsafe {
93+
let mut err = ptr::null_mut();
94+
let ret: bool = from_glib(glib_sys::g_thread_pool_set_max_threads(
95+
self.0.as_ptr(),
96+
max_threads.map(|v| v as i32).unwrap_or(-1),
97+
&mut err,
98+
));
99+
if ret {
100+
Ok(())
101+
} else {
102+
Err(from_glib_full(err))
103+
}
104+
}
105+
}
106+
107+
pub fn get_max_threads(&self) -> Option<u32> {
108+
unsafe {
109+
let max_threads = glib_sys::g_thread_pool_get_max_threads(self.0.as_ptr());
110+
if max_threads == -1 {
111+
None
112+
} else {
113+
Some(max_threads as u32)
114+
}
115+
}
116+
}
117+
118+
pub fn get_num_threads(&self) -> u32 {
119+
unsafe { glib_sys::g_thread_pool_get_num_threads(self.0.as_ptr()) }
120+
}
121+
122+
pub fn get_unprocessed(&self) -> u32 {
123+
unsafe { glib_sys::g_thread_pool_unprocessed(self.0.as_ptr()) }
124+
}
125+
126+
pub fn set_max_unused_threads(max_threads: Option<u32>) {
127+
unsafe {
128+
glib_sys::g_thread_pool_set_max_unused_threads(
129+
max_threads.map(|v| v as i32).unwrap_or(-1),
130+
)
131+
}
132+
}
133+
134+
pub fn get_max_unused_threads() -> Option<u32> {
135+
unsafe {
136+
let max_unused_threads = glib_sys::g_thread_pool_get_max_unused_threads();
137+
if max_unused_threads == -1 {
138+
None
139+
} else {
140+
Some(max_unused_threads as u32)
141+
}
142+
}
143+
}
144+
145+
pub fn get_num_unused_threads() -> u32 {
146+
unsafe { glib_sys::g_thread_pool_get_num_unused_threads() }
147+
}
148+
149+
pub fn stop_unused_threads() {
150+
unsafe {
151+
glib_sys::g_thread_pool_stop_unused_threads();
152+
}
153+
}
154+
155+
pub fn set_max_idle_time(max_idle_time: u32) {
156+
unsafe { glib_sys::g_thread_pool_set_max_idle_time(max_idle_time) }
157+
}
158+
159+
pub fn get_max_idle_time() -> u32 {
160+
unsafe { glib_sys::g_thread_pool_get_max_idle_time() }
161+
}
162+
}
163+
164+
impl Drop for ThreadPool {
165+
fn drop(&mut self) {
166+
unsafe {
167+
glib_sys::g_thread_pool_free(self.0.as_ptr(), glib_sys::GFALSE, glib_sys::GTRUE);
168+
}
169+
}
170+
}
171+
172+
unsafe extern "C" fn spawn_func(func: glib_sys::gpointer, _data: glib_sys::gpointer) {
173+
let func: Box<Box<dyn FnOnce()>> = Box::from_raw(func as *mut _);
174+
func()
175+
}
176+
177+
#[cfg(test)]
178+
mod tests {
179+
use super::*;
180+
181+
#[test]
182+
fn test_push() {
183+
use std::sync::mpsc;
184+
185+
let p = ThreadPool::new_exclusive(1).unwrap();
186+
let (sender, receiver) = mpsc::channel();
187+
188+
p.push(move || {
189+
sender.send(true).unwrap();
190+
})
191+
.unwrap();
192+
193+
assert_eq!(receiver.recv(), Ok(true));
194+
}
195+
196+
#[test]
197+
fn test_push_future() {
198+
let c = ::MainContext::new();
199+
let p = ThreadPool::new_shared(None).unwrap();
200+
201+
let fut = p.push_future(|| true).unwrap();
202+
203+
let res = c.block_on(fut);
204+
assert!(res);
205+
}
206+
}

0 commit comments

Comments
 (0)