41
41
//! [^2] `MTLockRef` is a typedef.
42
42
43
43
pub use crate :: marker:: * ;
44
+ use parking_lot:: Mutex ;
45
+ use std:: any:: Any ;
44
46
use std:: collections:: HashMap ;
45
47
use std:: hash:: { BuildHasher , Hash } ;
46
48
use std:: ops:: { Deref , DerefMut } ;
@@ -103,6 +105,37 @@ mod mode {
103
105
104
106
pub use mode:: { is_dyn_thread_safe, set_dyn_thread_safe_mode} ;
105
107
108
+ /// A guard used to hold panics that occur during a parallel section to later by unwound.
109
+ /// This is used for the parallel compiler to prevent fatal errors from non-deterministically
110
+ /// hiding errors by ensuring that everything in the section has completed executing before
111
+ /// continuing with unwinding. It's also used for the non-parallel code to ensure error message
112
+ /// output match the parallel compiler for testing purposes.
113
+ pub struct ParallelGuard {
114
+ panic : Mutex < Option < Box < dyn Any + std:: marker:: Send + ' static > > > ,
115
+ }
116
+
117
+ impl ParallelGuard {
118
+ pub fn run < R > ( & self , f : impl FnOnce ( ) -> R ) -> Option < R > {
119
+ catch_unwind ( AssertUnwindSafe ( f) )
120
+ . map_err ( |err| {
121
+ * self . panic . lock ( ) = Some ( err) ;
122
+ } )
123
+ . ok ( )
124
+ }
125
+ }
126
+
127
+ /// This gives access to a fresh parallel guard in the closure and will unwind any panics
128
+ /// caught in it after the closure returns.
129
+ #[ inline]
130
+ pub fn parallel_guard < R > ( f : impl FnOnce ( & ParallelGuard ) -> R ) -> R {
131
+ let guard = ParallelGuard { panic : Mutex :: new ( None ) } ;
132
+ let ret = f ( & guard) ;
133
+ if let Some ( panic) = guard. panic . into_inner ( ) {
134
+ resume_unwind ( panic) ;
135
+ }
136
+ ret
137
+ }
138
+
106
139
cfg_if ! {
107
140
if #[ cfg( not( parallel_compiler) ) ] {
108
141
use std:: ops:: Add ;
@@ -198,67 +231,38 @@ cfg_if! {
198
231
where A : FnOnce ( ) -> RA ,
199
232
B : FnOnce ( ) -> RB
200
233
{
201
- ( oper_a( ) , oper_b( ) )
234
+ let ( a, b) = parallel_guard( |guard| {
235
+ let a = guard. run( oper_a) ;
236
+ let b = guard. run( oper_b) ;
237
+ ( a, b)
238
+ } ) ;
239
+ ( a. unwrap( ) , b. unwrap( ) )
202
240
}
203
241
204
242
#[ macro_export]
205
243
macro_rules! parallel {
206
- ( $( $blocks: block) , * ) => {
207
- // We catch panics here ensuring that all the blocks execute.
208
- // This makes behavior consistent with the parallel compiler.
209
- let mut panic = None ;
210
- $(
211
- if let Err ( p) = :: std:: panic:: catch_unwind(
212
- :: std:: panic:: AssertUnwindSafe ( || $blocks)
213
- ) {
214
- if panic. is_none( ) {
215
- panic = Some ( p) ;
216
- }
217
- }
218
- ) *
219
- if let Some ( panic) = panic {
220
- :: std:: panic:: resume_unwind( panic) ;
221
- }
222
- }
244
+ ( $( $blocks: block) , * ) => { {
245
+ $crate :: sync:: parallel_guard( |guard| {
246
+ $( guard. run( || $blocks) ; ) *
247
+ } ) ;
248
+ } }
223
249
}
224
250
225
251
pub fn par_for_each_in<T : IntoIterator >( t: T , mut for_each: impl FnMut ( T :: Item ) + Sync + Send ) {
226
- // We catch panics here ensuring that all the loop iterations execute.
227
- // This makes behavior consistent with the parallel compiler.
228
- let mut panic = None ;
229
- t. into_iter( ) . for_each( |i| {
230
- if let Err ( p) = catch_unwind( AssertUnwindSafe ( || for_each( i) ) ) {
231
- if panic. is_none( ) {
232
- panic = Some ( p) ;
233
- }
234
- }
235
- } ) ;
236
- if let Some ( panic) = panic {
237
- resume_unwind( panic) ;
238
- }
252
+ parallel_guard( |guard| {
253
+ t. into_iter( ) . for_each( |i| {
254
+ guard. run( || for_each( i) ) ;
255
+ } ) ;
256
+ } )
239
257
}
240
258
241
259
pub fn par_map<T : IntoIterator , R , C : FromIterator <R >>(
242
260
t: T ,
243
261
mut map: impl FnMut ( <<T as IntoIterator >:: IntoIter as Iterator >:: Item ) -> R ,
244
262
) -> C {
245
- // We catch panics here ensuring that all the loop iterations execute.
246
- let mut panic = None ;
247
- let r = t. into_iter( ) . filter_map( |i| {
248
- match catch_unwind( AssertUnwindSafe ( || map( i) ) ) {
249
- Ok ( r) => Some ( r) ,
250
- Err ( p) => {
251
- if panic. is_none( ) {
252
- panic = Some ( p) ;
253
- }
254
- None
255
- }
256
- }
257
- } ) . collect( ) ;
258
- if let Some ( panic) = panic {
259
- resume_unwind( panic) ;
260
- }
261
- r
263
+ parallel_guard( |guard| {
264
+ t. into_iter( ) . filter_map( |i| guard. run( || map( i) ) ) . collect( )
265
+ } )
262
266
}
263
267
264
268
pub use std:: rc:: Rc as Lrc ;
@@ -313,8 +317,6 @@ cfg_if! {
313
317
}
314
318
}
315
319
} else {
316
- use parking_lot:: Mutex ;
317
-
318
320
pub use std:: marker:: Send as Send ;
319
321
pub use std:: marker:: Sync as Sync ;
320
322
@@ -380,7 +382,12 @@ cfg_if! {
380
382
let ( a, b) = rayon:: join( move || FromDyn :: from( oper_a. into_inner( ) ( ) ) , move || FromDyn :: from( oper_b. into_inner( ) ( ) ) ) ;
381
383
( a. into_inner( ) , b. into_inner( ) )
382
384
} else {
383
- ( oper_a( ) , oper_b( ) )
385
+ let ( a, b) = parallel_guard( |guard| {
386
+ let a = guard. run( oper_a) ;
387
+ let b = guard. run( oper_b) ;
388
+ ( a, b)
389
+ } ) ;
390
+ ( a. unwrap( ) , b. unwrap( ) )
384
391
}
385
392
}
386
393
@@ -415,28 +422,10 @@ cfg_if! {
415
422
// of a single threaded rustc.
416
423
parallel!( impl $fblock [ ] [ $( $blocks) , * ] ) ;
417
424
} else {
418
- // We catch panics here ensuring that all the blocks execute.
419
- // This makes behavior consistent with the parallel compiler.
420
- let mut panic = None ;
421
- if let Err ( p) = :: std:: panic:: catch_unwind(
422
- :: std:: panic:: AssertUnwindSafe ( || $fblock)
423
- ) {
424
- if panic. is_none( ) {
425
- panic = Some ( p) ;
426
- }
427
- }
428
- $(
429
- if let Err ( p) = :: std:: panic:: catch_unwind(
430
- :: std:: panic:: AssertUnwindSafe ( || $blocks)
431
- ) {
432
- if panic. is_none( ) {
433
- panic = Some ( p) ;
434
- }
435
- }
436
- ) *
437
- if let Some ( panic) = panic {
438
- :: std:: panic:: resume_unwind( panic) ;
439
- }
425
+ $crate :: sync:: parallel_guard( |guard| {
426
+ guard. run( || $fblock) ;
427
+ $( guard. run( || $blocks) ; ) *
428
+ } ) ;
440
429
}
441
430
} ;
442
431
}
@@ -447,34 +436,18 @@ cfg_if! {
447
436
t: T ,
448
437
for_each: impl Fn ( I ) + DynSync + DynSend
449
438
) {
450
- if mode:: is_dyn_thread_safe( ) {
451
- let for_each = FromDyn :: from( for_each) ;
452
- let panic: Mutex <Option <_>> = Mutex :: new( None ) ;
453
- t. into_par_iter( ) . for_each( |i| if let Err ( p) = catch_unwind( AssertUnwindSafe ( || for_each( i) ) ) {
454
- let mut l = panic. lock( ) ;
455
- if l. is_none( ) {
456
- * l = Some ( p)
457
- }
458
- } ) ;
459
-
460
- if let Some ( panic) = panic. into_inner( ) {
461
- resume_unwind( panic) ;
462
- }
463
- } else {
464
- // We catch panics here ensuring that all the loop iterations execute.
465
- // This makes behavior consistent with the parallel compiler.
466
- let mut panic = None ;
467
- t. into_iter( ) . for_each( |i| {
468
- if let Err ( p) = catch_unwind( AssertUnwindSafe ( || for_each( i) ) ) {
469
- if panic. is_none( ) {
470
- panic = Some ( p) ;
471
- }
472
- }
473
- } ) ;
474
- if let Some ( panic) = panic {
475
- resume_unwind( panic) ;
439
+ parallel_guard( |guard| {
440
+ if mode:: is_dyn_thread_safe( ) {
441
+ let for_each = FromDyn :: from( for_each) ;
442
+ t. into_par_iter( ) . for_each( |i| {
443
+ guard. run( || for_each( i) ) ;
444
+ } ) ;
445
+ } else {
446
+ t. into_iter( ) . for_each( |i| {
447
+ guard. run( || for_each( i) ) ;
448
+ } ) ;
476
449
}
477
- }
450
+ } ) ;
478
451
}
479
452
480
453
pub fn par_map<
@@ -486,46 +459,14 @@ cfg_if! {
486
459
t: T ,
487
460
map: impl Fn ( I ) -> R + DynSync + DynSend
488
461
) -> C {
489
- if mode:: is_dyn_thread_safe( ) {
490
- let panic: Mutex <Option <_>> = Mutex :: new( None ) ;
491
- let map = FromDyn :: from( map) ;
492
- // We catch panics here ensuring that all the loop iterations execute.
493
- let r = t. into_par_iter( ) . filter_map( |i| {
494
- match catch_unwind( AssertUnwindSafe ( || map( i) ) ) {
495
- Ok ( r) => Some ( r) ,
496
- Err ( p) => {
497
- let mut l = panic. lock( ) ;
498
- if l. is_none( ) {
499
- * l = Some ( p) ;
500
- }
501
- None
502
- } ,
503
- }
504
- } ) . collect( ) ;
505
-
506
- if let Some ( panic) = panic. into_inner( ) {
507
- resume_unwind( panic) ;
508
- }
509
- r
510
- } else {
511
- // We catch panics here ensuring that all the loop iterations execute.
512
- let mut panic = None ;
513
- let r = t. into_iter( ) . filter_map( |i| {
514
- match catch_unwind( AssertUnwindSafe ( || map( i) ) ) {
515
- Ok ( r) => Some ( r) ,
516
- Err ( p) => {
517
- if panic. is_none( ) {
518
- panic = Some ( p) ;
519
- }
520
- None
521
- }
522
- }
523
- } ) . collect( ) ;
524
- if let Some ( panic) = panic {
525
- resume_unwind( panic) ;
462
+ parallel_guard( |guard| {
463
+ if mode:: is_dyn_thread_safe( ) {
464
+ let map = FromDyn :: from( map) ;
465
+ t. into_par_iter( ) . filter_map( |i| guard. run( || map( i) ) ) . collect( )
466
+ } else {
467
+ t. into_iter( ) . filter_map( |i| guard. run( || map( i) ) ) . collect( )
526
468
}
527
- r
528
- }
469
+ } )
529
470
}
530
471
531
472
/// This makes locks panic if they are already held.
0 commit comments