rustc_data_structures/sync/
parallel.rs

1//! This module defines parallel operations that are implemented in
2//! one way for the serial compiler, and another way the parallel compiler.
3
4#![allow(dead_code)]
5
6use std::any::Any;
7use std::panic::{AssertUnwindSafe, catch_unwind, resume_unwind};
8
9use parking_lot::Mutex;
10
11use crate::FatalErrorMarker;
12use crate::sync::{DynSend, DynSync, FromDyn, IntoDynSyncSend, mode};
13
14/// A guard used to hold panics that occur during a parallel section to later by unwound.
15/// This is used for the parallel compiler to prevent fatal errors from non-deterministically
16/// hiding errors by ensuring that everything in the section has completed executing before
17/// continuing with unwinding. It's also used for the non-parallel code to ensure error message
18/// output match the parallel compiler for testing purposes.
19pub struct ParallelGuard {
20    panic: Mutex<Option<IntoDynSyncSend<Box<dyn Any + Send + 'static>>>>,
21}
22
23impl ParallelGuard {
24    pub fn run<R>(&self, f: impl FnOnce() -> R) -> Option<R> {
25        catch_unwind(AssertUnwindSafe(f))
26            .map_err(|err| {
27                let mut panic = self.panic.lock();
28                if panic.is_none() || !(*err).is::<FatalErrorMarker>() {
29                    *panic = Some(IntoDynSyncSend(err));
30                }
31            })
32            .ok()
33    }
34}
35
36/// This gives access to a fresh parallel guard in the closure and will unwind any panics
37/// caught in it after the closure returns.
38#[inline]
39pub fn parallel_guard<R>(f: impl FnOnce(&ParallelGuard) -> R) -> R {
40    let guard = ParallelGuard { panic: Mutex::new(None) };
41    let ret = f(&guard);
42    if let Some(IntoDynSyncSend(panic)) = guard.panic.into_inner() {
43        resume_unwind(panic);
44    }
45    ret
46}
47
48fn serial_join<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB)
49where
50    A: FnOnce() -> RA,
51    B: FnOnce() -> RB,
52{
53    let (a, b) = parallel_guard(|guard| {
54        let a = guard.run(oper_a);
55        let b = guard.run(oper_b);
56        (a, b)
57    });
58    (a.unwrap(), b.unwrap())
59}
60
61/// Runs a list of blocks in parallel. The first block is executed immediately on
62/// the current thread. Use that for the longest running block.
63#[macro_export]
64macro_rules! parallel {
65        (impl $fblock:block [$($c:expr,)*] [$block:expr $(, $rest:expr)*]) => {
66            parallel!(impl $fblock [$block, $($c,)*] [$($rest),*])
67        };
68        (impl $fblock:block [$($blocks:expr,)*] []) => {
69            $crate::sync::parallel_guard(|guard| {
70                $crate::sync::scope(|s| {
71                    $(
72                        let block = $crate::sync::FromDyn::from(|| $blocks);
73                        s.spawn(move |_| {
74                            guard.run(move || block.into_inner()());
75                        });
76                    )*
77                    guard.run(|| $fblock);
78                });
79            });
80        };
81        ($fblock:block, $($blocks:block),*) => {
82            if $crate::sync::is_dyn_thread_safe() {
83                // Reverse the order of the later blocks since Rayon executes them in reverse order
84                // when using a single thread. This ensures the execution order matches that
85                // of a single threaded rustc.
86                parallel!(impl $fblock [] [$($blocks),*]);
87            } else {
88                $crate::sync::parallel_guard(|guard| {
89                    guard.run(|| $fblock);
90                    $(guard.run(|| $blocks);)*
91                });
92            }
93        };
94    }
95
96pub fn spawn(func: impl FnOnce() + DynSend + 'static) {
97    if mode::is_dyn_thread_safe() {
98        let func = FromDyn::from(func);
99        rayon_core::spawn(|| {
100            (func.into_inner())();
101        });
102    } else {
103        func()
104    }
105}
106
107// This function only works when `mode::is_dyn_thread_safe()`.
108pub fn scope<'scope, OP, R>(op: OP) -> R
109where
110    OP: FnOnce(&rayon_core::Scope<'scope>) -> R + DynSend,
111    R: DynSend,
112{
113    let op = FromDyn::from(op);
114    rayon_core::scope(|s| FromDyn::from(op.into_inner()(s))).into_inner()
115}
116
117#[inline]
118pub fn join<A, B, RA: DynSend, RB: DynSend>(oper_a: A, oper_b: B) -> (RA, RB)
119where
120    A: FnOnce() -> RA + DynSend,
121    B: FnOnce() -> RB + DynSend,
122{
123    if mode::is_dyn_thread_safe() {
124        let oper_a = FromDyn::from(oper_a);
125        let oper_b = FromDyn::from(oper_b);
126        let (a, b) = parallel_guard(|guard| {
127            rayon_core::join(
128                move || guard.run(move || FromDyn::from(oper_a.into_inner()())),
129                move || guard.run(move || FromDyn::from(oper_b.into_inner()())),
130            )
131        });
132        (a.unwrap().into_inner(), b.unwrap().into_inner())
133    } else {
134        serial_join(oper_a, oper_b)
135    }
136}
137
138fn par_slice<I: DynSend>(
139    items: &mut [I],
140    guard: &ParallelGuard,
141    for_each: impl Fn(&mut I) + DynSync + DynSend,
142) {
143    struct State<'a, F> {
144        for_each: FromDyn<F>,
145        guard: &'a ParallelGuard,
146        group: usize,
147    }
148
149    fn par_rec<I: DynSend, F: Fn(&mut I) + DynSync + DynSend>(
150        items: &mut [I],
151        state: &State<'_, F>,
152    ) {
153        if items.len() <= state.group {
154            for item in items {
155                state.guard.run(|| (state.for_each)(item));
156            }
157        } else {
158            let (left, right) = items.split_at_mut(items.len() / 2);
159            let mut left = state.for_each.derive(left);
160            let mut right = state.for_each.derive(right);
161            rayon_core::join(move || par_rec(*left, state), move || par_rec(*right, state));
162        }
163    }
164
165    let state = State {
166        for_each: FromDyn::from(for_each),
167        guard,
168        group: std::cmp::max(items.len() / 128, 1),
169    };
170    par_rec(items, &state)
171}
172
173pub fn par_for_each_in<I: DynSend, T: IntoIterator<Item = I>>(
174    t: T,
175    for_each: impl Fn(&I) + DynSync + DynSend,
176) {
177    parallel_guard(|guard| {
178        if mode::is_dyn_thread_safe() {
179            let mut items: Vec<_> = t.into_iter().collect();
180            par_slice(&mut items, guard, |i| for_each(&*i))
181        } else {
182            t.into_iter().for_each(|i| {
183                guard.run(|| for_each(&i));
184            });
185        }
186    });
187}
188
189/// This runs `for_each` in parallel for each iterator item. If one or more of the
190/// `for_each` calls returns `Err`, the function will also return `Err`. The error returned
191/// will be non-deterministic, but this is expected to be used with `ErrorGuaranteed` which
192/// are all equivalent.
193pub fn try_par_for_each_in<T: IntoIterator, E: DynSend>(
194    t: T,
195    for_each: impl Fn(&<T as IntoIterator>::Item) -> Result<(), E> + DynSync + DynSend,
196) -> Result<(), E>
197where
198    <T as IntoIterator>::Item: DynSend,
199{
200    parallel_guard(|guard| {
201        if mode::is_dyn_thread_safe() {
202            let mut items: Vec<_> = t.into_iter().collect();
203
204            let error = Mutex::new(None);
205
206            par_slice(&mut items, guard, |i| {
207                if let Err(err) = for_each(&*i) {
208                    *error.lock() = Some(err);
209                }
210            });
211
212            if let Some(err) = error.into_inner() { Err(err) } else { Ok(()) }
213        } else {
214            t.into_iter().filter_map(|i| guard.run(|| for_each(&i))).fold(Ok(()), Result::and)
215        }
216    })
217}
218
219pub fn par_map<I: DynSend, T: IntoIterator<Item = I>, R: DynSend, C: FromIterator<R>>(
220    t: T,
221    map: impl Fn(I) -> R + DynSync + DynSend,
222) -> C {
223    parallel_guard(|guard| {
224        if mode::is_dyn_thread_safe() {
225            let map = FromDyn::from(map);
226
227            let mut items: Vec<(Option<I>, Option<R>)> =
228                t.into_iter().map(|i| (Some(i), None)).collect();
229
230            par_slice(&mut items, guard, |i| {
231                i.1 = Some(map(i.0.take().unwrap()));
232            });
233
234            items.into_iter().filter_map(|i| i.1).collect()
235        } else {
236            t.into_iter().filter_map(|i| guard.run(|| map(i))).collect()
237        }
238    })
239}
240
241pub fn broadcast<R: DynSend>(op: impl Fn(usize) -> R + DynSync) -> Vec<R> {
242    if mode::is_dyn_thread_safe() {
243        let op = FromDyn::from(op);
244        let results = rayon_core::broadcast(|context| op.derive(op(context.index())));
245        results.into_iter().map(|r| r.into_inner()).collect()
246    } else {
247        vec![op(0)]
248    }
249}