Skip to content

Commit 08697c8

Browse files
djgcramertj
authored andcommitted
Allow creation of CpuPool to be fallible.
To allow error handling in the face of errors returned by thread creation.
1 parent c5c976d commit 08697c8

File tree

2 files changed

+41
-20
lines changed

2 files changed

+41
-20
lines changed

futures-cpupool/src/lib.rs

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
//! # fn main() {
2020
//!
2121
//! // Create a worker thread pool with four threads
22-
//! let pool = CpuPool::new(4);
22+
//! let pool = CpuPool::new(4).unwrap();
2323
//!
2424
//! // Execute some work on the thread pool, optionally closing over data.
2525
//! let a = pool.spawn(long_running_future(2));
@@ -40,6 +40,7 @@
4040
extern crate futures;
4141
extern crate num_cpus;
4242

43+
use std::io;
4344
use std::panic::{self, AssertUnwindSafe};
4445
use std::sync::{Arc, Mutex};
4546
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
@@ -147,16 +148,24 @@ impl CpuPool {
147148
///
148149
/// ```rust
149150
/// # use futures_cpupool::{Builder, CpuPool};
151+
/// # use std::io;
150152
/// #
151-
/// # fn new(size: usize) -> CpuPool {
153+
/// # fn new(size: usize) -> io::Result<CpuPool> {
152154
/// Builder::new().pool_size(size).create()
153155
/// # }
154156
/// ```
155157
///
158+
/// # Errors
159+
///
160+
/// This method yields an [`io::Result`] to capture any failure to
161+
/// create the thread at the OS level.
162+
///
163+
/// [`io::Result`]: https://doc.rust-lang.org/stable/std/io/type.Result.html
164+
///
156165
/// # Panics
157166
///
158167
/// Panics if `size == 0`.
159-
pub fn new(size: usize) -> CpuPool {
168+
pub fn new(size: usize) -> io::Result<CpuPool> {
160169
Builder::new().pool_size(size).create()
161170
}
162171

@@ -167,12 +176,20 @@ impl CpuPool {
167176
///
168177
/// ```rust
169178
/// # use futures_cpupool::{Builder, CpuPool};
179+
/// # use std::io;
170180
/// #
171-
/// # fn new_num_cpus() -> CpuPool {
181+
/// # fn new_num_cpus() -> io::Result<CpuPool> {
172182
/// Builder::new().create()
173183
/// # }
174184
/// ```
175-
pub fn new_num_cpus() -> CpuPool {
185+
///
186+
/// # Errors
187+
///
188+
/// This method yields an [`io::Result`] to capture any failure to
189+
/// create the thread at the OS level.
190+
///
191+
/// [`io::Result`]: https://doc.rust-lang.org/stable/std/io/type.Result.html
192+
pub fn new_num_cpus() -> io::Result<CpuPool> {
176193
Builder::new().create()
177194
}
178195

@@ -398,10 +415,17 @@ impl Builder {
398415

399416
/// Create CpuPool with configured parameters
400417
///
418+
/// # Errors
419+
///
420+
/// This method yields an [`io::Result`] to capture any failure to
421+
/// create the thread at the OS level.
422+
///
423+
/// [`io::Result`]: https://doc.rust-lang.org/stable/std/io/type.Result.html
424+
///
401425
/// # Panics
402426
///
403427
/// Panics if `pool_size == 0`.
404-
pub fn create(&mut self) -> CpuPool {
428+
pub fn create(&mut self) -> io::Result<CpuPool> {
405429
let (tx, rx) = mpsc::channel();
406430
let pool = CpuPool {
407431
inner: Arc::new(Inner {
@@ -424,9 +448,9 @@ impl Builder {
424448
if self.stack_size > 0 {
425449
thread_builder = thread_builder.stack_size(self.stack_size);
426450
}
427-
thread_builder.spawn(move || inner.work(after_start, before_stop)).unwrap();
451+
thread_builder.spawn(move || inner.work(after_start, before_stop))?;
428452
}
429-
return pool
453+
Ok(pool)
430454
}
431455
}
432456

futures-cpupool/tests/smoke.rs

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,15 @@ use std::thread;
66
use std::time::Duration;
77

88
use futures::future::Future;
9-
use futures_cpupool::{CpuPool, Builder};
9+
use futures_cpupool::{Builder, CpuPool};
1010

1111
fn done<T: Send + 'static>(t: T) -> Box<Future<Item = T, Error = ()> + Send> {
1212
Box::new(futures::future::ok(t))
1313
}
1414

1515
#[test]
1616
fn join() {
17-
let pool = CpuPool::new(2);
17+
let pool = CpuPool::new(2).unwrap();
1818
let a = pool.spawn(done(1));
1919
let b = pool.spawn(done(2));
2020
let res = a.join(b).map(|(a, b)| a + b).wait();
@@ -24,7 +24,7 @@ fn join() {
2424

2525
#[test]
2626
fn select() {
27-
let pool = CpuPool::new(2);
27+
let pool = CpuPool::new(2).unwrap();
2828
let a = pool.spawn(done(1));
2929
let b = pool.spawn(done(2));
3030
let (item1, next) = a.select(b).wait().ok().unwrap();
@@ -48,7 +48,7 @@ fn threads_go_away() {
4848

4949
thread_local!(static FOO: A = A);
5050

51-
let pool = CpuPool::new(2);
51+
let pool = CpuPool::new(2).unwrap();
5252
let _handle = pool.spawn_fn(|| {
5353
FOO.with(|_| ());
5454
Ok::<(), ()>(())
@@ -57,7 +57,7 @@ fn threads_go_away() {
5757

5858
for _ in 0..100 {
5959
if CNT.load(Ordering::SeqCst) == 1 {
60-
return
60+
return;
6161
}
6262
thread::sleep(Duration::from_millis(10));
6363
}
@@ -81,10 +81,9 @@ fn lifecycle_test() {
8181
.pool_size(4)
8282
.after_start(after_start)
8383
.before_stop(before_stop)
84-
.create();
85-
let _handle = pool.spawn_fn(|| {
86-
Ok::<(), ()>(())
87-
});
84+
.create()
85+
.unwrap();
86+
let _handle = pool.spawn_fn(|| Ok::<(), ()>(()));
8887
drop(pool);
8988

9089
for _ in 0..100 {
@@ -99,9 +98,7 @@ fn lifecycle_test() {
9998

10099
#[test]
101100
fn thread_name() {
102-
let pool = Builder::new()
103-
.name_prefix("my-pool-")
104-
.create();
101+
let pool = Builder::new().name_prefix("my-pool-").create().unwrap();
105102
let future = pool.spawn_fn(|| {
106103
assert!(thread::current().name().unwrap().starts_with("my-pool-"));
107104
Ok::<(), ()>(())

0 commit comments

Comments
 (0)