rustc_thread_pool/join/
mod.rs

1use std::sync::atomic::{AtomicBool, Ordering};
2
3use crate::job::StackJob;
4use crate::latch::SpinLatch;
5use crate::{FnContext, registry, tlv, unwind};
6
7#[cfg(test)]
8mod tests;
9
10/// Takes two closures and *potentially* runs them in parallel. It
11/// returns a pair of the results from those closures.
12///
13/// Conceptually, calling `join()` is similar to spawning two threads,
14/// one executing each of the two closures. However, the
15/// implementation is quite different and incurs very low
16/// overhead. The underlying technique is called "work stealing": the
17/// Rayon runtime uses a fixed pool of worker threads and attempts to
18/// only execute code in parallel when there are idle CPUs to handle
19/// it.
20///
21/// When `join` is called from outside the thread pool, the calling
22/// thread will block while the closures execute in the pool. When
23/// `join` is called within the pool, the calling thread still actively
24/// participates in the thread pool. It will begin by executing closure
25/// A (on the current thread). While it is doing that, it will advertise
26/// closure B as being available for other threads to execute. Once closure A
27/// has completed, the current thread will try to execute closure B;
28/// if however closure B has been stolen, then it will look for other work
29/// while waiting for the thief to fully execute closure B. (This is the
30/// typical work-stealing strategy).
31///
32/// # Examples
33///
34/// This example uses join to perform a quick-sort (note this is not a
35/// particularly optimized implementation: if you **actually** want to
36/// sort for real, you should prefer [the `par_sort` method] offered
37/// by Rayon).
38///
39/// [the `par_sort` method]: ../rayon/slice/trait.ParallelSliceMut.html#method.par_sort
40///
41/// ```rust
42/// # use rustc_thread_pool as rayon;
43/// let mut v = vec![5, 1, 8, 22, 0, 44];
44/// quick_sort(&mut v);
45/// assert_eq!(v, vec![0, 1, 5, 8, 22, 44]);
46///
47/// fn quick_sort<T:PartialOrd+Send>(v: &mut [T]) {
48///    if v.len() > 1 {
49///        let mid = partition(v);
50///        let (lo, hi) = v.split_at_mut(mid);
51///        rayon::join(|| quick_sort(lo),
52///                    || quick_sort(hi));
53///    }
54/// }
55///
56/// // Partition rearranges all items `<=` to the pivot
57/// // item (arbitrary selected to be the last item in the slice)
58/// // to the first half of the slice. It then returns the
59/// // "dividing point" where the pivot is placed.
60/// fn partition<T:PartialOrd+Send>(v: &mut [T]) -> usize {
61///     let pivot = v.len() - 1;
62///     let mut i = 0;
63///     for j in 0..pivot {
64///         if v[j] <= v[pivot] {
65///             v.swap(i, j);
66///             i += 1;
67///         }
68///     }
69///     v.swap(i, pivot);
70///     i
71/// }
72/// ```
73///
74/// # Warning about blocking I/O
75///
76/// The assumption is that the closures given to `join()` are
77/// CPU-bound tasks that do not perform I/O or other blocking
78/// operations. If you do perform I/O, and that I/O should block
79/// (e.g., waiting for a network request), the overall performance may
80/// be poor. Moreover, if you cause one closure to be blocked waiting
81/// on another (for example, using a channel), that could lead to a
82/// deadlock.
83///
84/// # Panics
85///
86/// No matter what happens, both closures will always be executed. If
87/// a single closure panics, whether it be the first or second
88/// closure, that panic will be propagated and hence `join()` will
89/// panic with the same panic value. If both closures panic, `join()`
90/// will panic with the panic value from the first closure.
91pub fn join<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB)
92where
93    A: FnOnce() -> RA + Send,
94    B: FnOnce() -> RB + Send,
95    RA: Send,
96    RB: Send,
97{
98    #[inline]
99    fn call<R>(f: impl FnOnce() -> R) -> impl FnOnce(FnContext) -> R {
100        move |_| f()
101    }
102
103    join_context(call(oper_a), call(oper_b))
104}
105
106/// Identical to `join`, except that the closures have a parameter
107/// that provides context for the way the closure has been called,
108/// especially indicating whether they're executing on a different
109/// thread than where `join_context` was called. This will occur if
110/// the second job is stolen by a different thread, or if
111/// `join_context` was called from outside the thread pool to begin
112/// with.
113pub fn join_context<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB)
114where
115    A: FnOnce(FnContext) -> RA + Send,
116    B: FnOnce(FnContext) -> RB + Send,
117    RA: Send,
118    RB: Send,
119{
120    #[inline]
121    fn call_a<R>(f: impl FnOnce(FnContext) -> R, injected: bool) -> impl FnOnce() -> R {
122        move || f(FnContext::new(injected))
123    }
124
125    #[inline]
126    fn call_b<R>(f: impl FnOnce(FnContext) -> R) -> impl FnOnce(bool) -> R {
127        move |migrated| f(FnContext::new(migrated))
128    }
129
130    registry::in_worker(|worker_thread, injected| unsafe {
131        let tlv = tlv::get();
132        // Create virtual wrapper for task b; this all has to be
133        // done here so that the stack frame can keep it all live
134        // long enough.
135        let job_b_started = AtomicBool::new(false);
136        let job_b = StackJob::new(
137            tlv,
138            |migrated| {
139                job_b_started.store(true, Ordering::Relaxed);
140                call_b(oper_b)(migrated)
141            },
142            SpinLatch::new(worker_thread),
143        );
144        let job_b_ref = job_b.as_job_ref();
145        let job_b_id = job_b_ref.id();
146        worker_thread.push(job_b_ref);
147
148        // Execute task a; hopefully b gets stolen in the meantime.
149        let status_a = unwind::halt_unwinding(call_a(oper_a, injected));
150        worker_thread.wait_for_jobs::<_, false>(
151            &job_b.latch,
152            || job_b_started.load(Ordering::Relaxed),
153            |job| job.id() == job_b_id,
154            |job| {
155                debug_assert_eq!(job.id(), job_b_id);
156                job_b.run_inline(injected);
157            },
158        );
159
160        // Restore the TLV since we might have run some jobs overwriting it when waiting for job b.
161        tlv::set(tlv);
162
163        let result_a = match status_a {
164            Ok(v) => v,
165            Err(err) => unwind::resume_unwinding(err),
166        };
167        (result_a, job_b.into_result())
168    })
169}