rustc_thread_pool/scope/
mod.rs

1//! Methods for custom fork-join scopes, created by the [`scope()`]
2//! and [`in_place_scope()`] functions. These are a more flexible alternative to [`join()`].
3//!
4//! [`scope()`]: fn.scope.html
5//! [`in_place_scope()`]: fn.in_place_scope.html
6//! [`join()`]: ../join/join.fn.html
7
8use std::any::Any;
9use std::marker::PhantomData;
10use std::mem::ManuallyDrop;
11use std::sync::Arc;
12use std::sync::atomic::{AtomicPtr, Ordering};
13use std::{fmt, ptr};
14
15use crate::broadcast::BroadcastContext;
16use crate::job::{ArcJob, HeapJob, JobFifo, JobRef};
17use crate::latch::{CountLatch, Latch};
18use crate::registry::{Registry, WorkerThread, global_registry, in_worker};
19use crate::tlv::{self, Tlv};
20use crate::unwind;
21
22#[cfg(test)]
23mod tests;
24
25/// Represents a fork-join scope which can be used to spawn any number of tasks.
26/// See [`scope()`] for more information.
27///
28///[`scope()`]: fn.scope.html
29pub struct Scope<'scope> {
30    base: ScopeBase<'scope>,
31}
32
33/// Represents a fork-join scope which can be used to spawn any number of tasks.
34/// Those spawned from the same thread are prioritized in relative FIFO order.
35/// See [`scope_fifo()`] for more information.
36///
37///[`scope_fifo()`]: fn.scope_fifo.html
38pub struct ScopeFifo<'scope> {
39    base: ScopeBase<'scope>,
40    fifos: Vec<JobFifo>,
41}
42
43struct ScopeBase<'scope> {
44    /// thread registry where `scope()` was executed or where `in_place_scope()`
45    /// should spawn jobs.
46    registry: Arc<Registry>,
47
48    /// if some job panicked, the error is stored here; it will be
49    /// propagated to the one who created the scope
50    panic: AtomicPtr<Box<dyn Any + Send + 'static>>,
51
52    /// latch to track job counts
53    job_completed_latch: CountLatch,
54
55    /// You can think of a scope as containing a list of closures to execute,
56    /// all of which outlive `'scope`. They're not actually required to be
57    /// `Sync`, but it's still safe to let the `Scope` implement `Sync` because
58    /// the closures are only *moved* across threads to be executed.
59    #[allow(clippy::type_complexity)]
60    marker: PhantomData<Box<dyn FnOnce(&Scope<'scope>) + Send + Sync + 'scope>>,
61
62    /// The TLV at the scope's creation. Used to set the TLV for spawned jobs.
63    tlv: Tlv,
64}
65
66/// Creates a "fork-join" scope `s` and invokes the closure with a
67/// reference to `s`. This closure can then spawn asynchronous tasks
68/// into `s`. Those tasks may run asynchronously with respect to the
69/// closure; they may themselves spawn additional tasks into `s`. When
70/// the closure returns, it will block until all tasks that have been
71/// spawned into `s` complete.
72///
73/// `scope()` is a more flexible building block compared to `join()`,
74/// since a loop can be used to spawn any number of tasks without
75/// recursing. However, that flexibility comes at a performance price:
76/// tasks spawned using `scope()` must be allocated onto the heap,
77/// whereas `join()` can make exclusive use of the stack. **Prefer
78/// `join()` (or, even better, parallel iterators) where possible.**
79///
80/// # Example
81///
82/// The Rayon `join()` function launches two closures and waits for them
83/// to stop. One could implement `join()` using a scope like so, although
84/// it would be less efficient than the real implementation:
85///
86/// ```rust
87/// # use rustc_thread_pool as rayon;
88/// pub fn join<A,B,RA,RB>(oper_a: A, oper_b: B) -> (RA, RB)
89///     where A: FnOnce() -> RA + Send,
90///           B: FnOnce() -> RB + Send,
91///           RA: Send,
92///           RB: Send,
93/// {
94///     let mut result_a: Option<RA> = None;
95///     let mut result_b: Option<RB> = None;
96///     rayon::scope(|s| {
97///         s.spawn(|_| result_a = Some(oper_a()));
98///         s.spawn(|_| result_b = Some(oper_b()));
99///     });
100///     (result_a.unwrap(), result_b.unwrap())
101/// }
102/// ```
103///
104/// # A note on threading
105///
106/// The closure given to `scope()` executes in the Rayon thread-pool,
107/// as do those given to `spawn()`. This means that you can't access
108/// thread-local variables (well, you can, but they may have
109/// unexpected values).
110///
111/// # Task execution
112///
113/// Task execution potentially starts as soon as `spawn()` is called.
114/// The task will end sometime before `scope()` returns. Note that the
115/// *closure* given to scope may return much earlier. In general
116/// the lifetime of a scope created like `scope(body)` goes something like this:
117///
118/// - Scope begins when `scope(body)` is called
119/// - Scope body `body()` is invoked
120///     - Scope tasks may be spawned
121/// - Scope body returns
122/// - Scope tasks execute, possibly spawning more tasks
123/// - Once all tasks are done, scope ends and `scope()` returns
124///
125/// To see how and when tasks are joined, consider this example:
126///
127/// ```rust
128/// # use rustc_thread_pool as rayon;
129/// // point start
130/// rayon::scope(|s| {
131///     s.spawn(|s| { // task s.1
132///         s.spawn(|s| { // task s.1.1
133///             rayon::scope(|t| {
134///                 t.spawn(|_| ()); // task t.1
135///                 t.spawn(|_| ()); // task t.2
136///             });
137///         });
138///     });
139///     s.spawn(|s| { // task s.2
140///     });
141///     // point mid
142/// });
143/// // point end
144/// ```
145///
146/// The various tasks that are run will execute roughly like so:
147///
148/// ```notrust
149/// | (start)
150/// |
151/// | (scope `s` created)
152/// +-----------------------------------------------+ (task s.2)
153/// +-------+ (task s.1)                            |
154/// |       |                                       |
155/// |       +---+ (task s.1.1)                      |
156/// |       |   |                                   |
157/// |       |   | (scope `t` created)               |
158/// |       |   +----------------+ (task t.2)       |
159/// |       |   +---+ (task t.1) |                  |
160/// | (mid) |   |   |            |                  |
161/// :       |   + <-+------------+ (scope `t` ends) |
162/// :       |   |                                   |
163/// |<------+---+-----------------------------------+ (scope `s` ends)
164/// |
165/// | (end)
166/// ```
167///
168/// The point here is that everything spawned into scope `s` will
169/// terminate (at latest) at the same point -- right before the
170/// original call to `rayon::scope` returns. This includes new
171/// subtasks created by other subtasks (e.g., task `s.1.1`). If a new
172/// scope is created (such as `t`), the things spawned into that scope
173/// will be joined before that scope returns, which in turn occurs
174/// before the creating task (task `s.1.1` in this case) finishes.
175///
176/// There is no guaranteed order of execution for spawns in a scope,
177/// given that other threads may steal tasks at any time. However, they
178/// are generally prioritized in a LIFO order on the thread from which
179/// they were spawned. So in this example, absent any stealing, we can
180/// expect `s.2` to execute before `s.1`, and `t.2` before `t.1`. Other
181/// threads always steal from the other end of the deque, like FIFO
182/// order. The idea is that "recent" tasks are most likely to be fresh
183/// in the local CPU's cache, while other threads can steal older
184/// "stale" tasks. For an alternate approach, consider
185/// [`scope_fifo()`] instead.
186///
187/// [`scope_fifo()`]: fn.scope_fifo.html
188///
189/// # Accessing stack data
190///
191/// In general, spawned tasks may access stack data in place that
192/// outlives the scope itself. Other data must be fully owned by the
193/// spawned task.
194///
195/// ```rust
196/// # use rustc_thread_pool as rayon;
197/// let ok: Vec<i32> = vec![1, 2, 3];
198/// rayon::scope(|s| {
199///     let bad: Vec<i32> = vec![4, 5, 6];
200///     s.spawn(|_| {
201///         // We can access `ok` because outlives the scope `s`.
202///         println!("ok: {:?}", ok);
203///
204///         // If we just try to use `bad` here, the closure will borrow `bad`
205///         // (because we are just printing it out, and that only requires a
206///         // borrow), which will result in a compilation error. Read on
207///         // for options.
208///         // println!("bad: {:?}", bad);
209///    });
210/// });
211/// ```
212///
213/// As the comments example above suggest, to reference `bad` we must
214/// take ownership of it. One way to do this is to detach the closure
215/// from the surrounding stack frame, using the `move` keyword. This
216/// will cause it to take ownership of *all* the variables it touches,
217/// in this case including both `ok` *and* `bad`:
218///
219/// ```rust
220/// # use rustc_thread_pool as rayon;
221/// let ok: Vec<i32> = vec![1, 2, 3];
222/// rayon::scope(|s| {
223///     let bad: Vec<i32> = vec![4, 5, 6];
224///     s.spawn(move |_| {
225///         println!("ok: {:?}", ok);
226///         println!("bad: {:?}", bad);
227///     });
228///
229///     // That closure is fine, but now we can't use `ok` anywhere else,
230///     // since it is owned by the previous task:
231///     // s.spawn(|_| println!("ok: {:?}", ok));
232/// });
233/// ```
234///
235/// While this works, it could be a problem if we want to use `ok` elsewhere.
236/// There are two choices. We can keep the closure as a `move` closure, but
237/// instead of referencing the variable `ok`, we create a shadowed variable that
238/// is a borrow of `ok` and capture *that*:
239///
240/// ```rust
241/// # use rustc_thread_pool as rayon;
242/// let ok: Vec<i32> = vec![1, 2, 3];
243/// rayon::scope(|s| {
244///     let bad: Vec<i32> = vec![4, 5, 6];
245///     let ok: &Vec<i32> = &ok; // shadow the original `ok`
246///     s.spawn(move |_| {
247///         println!("ok: {:?}", ok); // captures the shadowed version
248///         println!("bad: {:?}", bad);
249///     });
250///
251///     // Now we too can use the shadowed `ok`, since `&Vec<i32>` references
252///     // can be shared freely. Note that we need a `move` closure here though,
253///     // because otherwise we'd be trying to borrow the shadowed `ok`,
254///     // and that doesn't outlive `scope`.
255///     s.spawn(move |_| println!("ok: {:?}", ok));
256/// });
257/// ```
258///
259/// Another option is not to use the `move` keyword but instead to take ownership
260/// of individual variables:
261///
262/// ```rust
263/// # use rustc_thread_pool as rayon;
264/// let ok: Vec<i32> = vec![1, 2, 3];
265/// rayon::scope(|s| {
266///     let bad: Vec<i32> = vec![4, 5, 6];
267///     s.spawn(|_| {
268///         // Transfer ownership of `bad` into a local variable (also named `bad`).
269///         // This will force the closure to take ownership of `bad` from the environment.
270///         let bad = bad;
271///         println!("ok: {:?}", ok); // `ok` is only borrowed.
272///         println!("bad: {:?}", bad); // refers to our local variable, above.
273///     });
274///
275///     s.spawn(|_| println!("ok: {:?}", ok)); // we too can borrow `ok`
276/// });
277/// ```
278///
279/// # Panics
280///
281/// If a panic occurs, either in the closure given to `scope()` or in
282/// any of the spawned jobs, that panic will be propagated and the
283/// call to `scope()` will panic. If multiple panics occurs, it is
284/// non-deterministic which of their panic values will propagate.
285/// Regardless, once a task is spawned using `scope.spawn()`, it will
286/// execute, even if the spawning task should later panic. `scope()`
287/// returns once all spawned jobs have completed, and any panics are
288/// propagated at that point.
289pub fn scope<'scope, OP, R>(op: OP) -> R
290where
291    OP: FnOnce(&Scope<'scope>) -> R + Send,
292    R: Send,
293{
294    in_worker(|owner_thread, _| {
295        let scope = Scope::<'scope>::new(Some(owner_thread), None);
296        scope.base.complete(Some(owner_thread), || op(&scope))
297    })
298}
299
300/// Creates a "fork-join" scope `s` with FIFO order, and invokes the
301/// closure with a reference to `s`. This closure can then spawn
302/// asynchronous tasks into `s`. Those tasks may run asynchronously with
303/// respect to the closure; they may themselves spawn additional tasks
304/// into `s`. When the closure returns, it will block until all tasks
305/// that have been spawned into `s` complete.
306///
307/// # Task execution
308///
309/// Tasks in a `scope_fifo()` run similarly to [`scope()`], but there's a
310/// difference in the order of execution. Consider a similar example:
311///
312/// [`scope()`]: fn.scope.html
313///
314/// ```rust
315/// # use rustc_thread_pool as rayon;
316/// // point start
317/// rayon::scope_fifo(|s| {
318///     s.spawn_fifo(|s| { // task s.1
319///         s.spawn_fifo(|s| { // task s.1.1
320///             rayon::scope_fifo(|t| {
321///                 t.spawn_fifo(|_| ()); // task t.1
322///                 t.spawn_fifo(|_| ()); // task t.2
323///             });
324///         });
325///     });
326///     s.spawn_fifo(|s| { // task s.2
327///     });
328///     // point mid
329/// });
330/// // point end
331/// ```
332///
333/// The various tasks that are run will execute roughly like so:
334///
335/// ```notrust
336/// | (start)
337/// |
338/// | (FIFO scope `s` created)
339/// +--------------------+ (task s.1)
340/// +-------+ (task s.2) |
341/// |       |            +---+ (task s.1.1)
342/// |       |            |   |
343/// |       |            |   | (FIFO scope `t` created)
344/// |       |            |   +----------------+ (task t.1)
345/// |       |            |   +---+ (task t.2) |
346/// | (mid) |            |   |   |            |
347/// :       |            |   + <-+------------+ (scope `t` ends)
348/// :       |            |   |
349/// |<------+------------+---+ (scope `s` ends)
350/// |
351/// | (end)
352/// ```
353///
354/// Under `scope_fifo()`, the spawns are prioritized in a FIFO order on
355/// the thread from which they were spawned, as opposed to `scope()`'s
356/// LIFO. So in this example, we can expect `s.1` to execute before
357/// `s.2`, and `t.1` before `t.2`. Other threads also steal tasks in
358/// FIFO order, as usual. Overall, this has roughly the same order as
359/// the now-deprecated [`breadth_first`] option, except the effect is
360/// isolated to a particular scope. If spawns are intermingled from any
361/// combination of `scope()` and `scope_fifo()`, or from different
362/// threads, their order is only specified with respect to spawns in the
363/// same scope and thread.
364///
365/// For more details on this design, see Rayon [RFC #1].
366///
367/// [`breadth_first`]: struct.ThreadPoolBuilder.html#method.breadth_first
368/// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md
369///
370/// # Panics
371///
372/// If a panic occurs, either in the closure given to `scope_fifo()` or
373/// in any of the spawned jobs, that panic will be propagated and the
374/// call to `scope_fifo()` will panic. If multiple panics occurs, it is
375/// non-deterministic which of their panic values will propagate.
376/// Regardless, once a task is spawned using `scope.spawn_fifo()`, it
377/// will execute, even if the spawning task should later panic.
378/// `scope_fifo()` returns once all spawned jobs have completed, and any
379/// panics are propagated at that point.
380pub fn scope_fifo<'scope, OP, R>(op: OP) -> R
381where
382    OP: FnOnce(&ScopeFifo<'scope>) -> R + Send,
383    R: Send,
384{
385    in_worker(|owner_thread, _| {
386        let scope = ScopeFifo::<'scope>::new(Some(owner_thread), None);
387        scope.base.complete(Some(owner_thread), || op(&scope))
388    })
389}
390
391/// Creates a "fork-join" scope `s` and invokes the closure with a
392/// reference to `s`. This closure can then spawn asynchronous tasks
393/// into `s`. Those tasks may run asynchronously with respect to the
394/// closure; they may themselves spawn additional tasks into `s`. When
395/// the closure returns, it will block until all tasks that have been
396/// spawned into `s` complete.
397///
398/// This is just like `scope()` except the closure runs on the same thread
399/// that calls `in_place_scope()`. Only work that it spawns runs in the
400/// thread pool.
401///
402/// # Panics
403///
404/// If a panic occurs, either in the closure given to `in_place_scope()` or in
405/// any of the spawned jobs, that panic will be propagated and the
406/// call to `in_place_scope()` will panic. If multiple panics occurs, it is
407/// non-deterministic which of their panic values will propagate.
408/// Regardless, once a task is spawned using `scope.spawn()`, it will
409/// execute, even if the spawning task should later panic. `in_place_scope()`
410/// returns once all spawned jobs have completed, and any panics are
411/// propagated at that point.
412pub fn in_place_scope<'scope, OP, R>(op: OP) -> R
413where
414    OP: FnOnce(&Scope<'scope>) -> R,
415{
416    do_in_place_scope(None, op)
417}
418
419pub(crate) fn do_in_place_scope<'scope, OP, R>(registry: Option<&Arc<Registry>>, op: OP) -> R
420where
421    OP: FnOnce(&Scope<'scope>) -> R,
422{
423    let thread = unsafe { WorkerThread::current().as_ref() };
424    let scope = Scope::<'scope>::new(thread, registry);
425    scope.base.complete(thread, || op(&scope))
426}
427
428/// Creates a "fork-join" scope `s` with FIFO order, and invokes the
429/// closure with a reference to `s`. This closure can then spawn
430/// asynchronous tasks into `s`. Those tasks may run asynchronously with
431/// respect to the closure; they may themselves spawn additional tasks
432/// into `s`. When the closure returns, it will block until all tasks
433/// that have been spawned into `s` complete.
434///
435/// This is just like `scope_fifo()` except the closure runs on the same thread
436/// that calls `in_place_scope_fifo()`. Only work that it spawns runs in the
437/// thread pool.
438///
439/// # Panics
440///
441/// If a panic occurs, either in the closure given to `in_place_scope_fifo()` or in
442/// any of the spawned jobs, that panic will be propagated and the
443/// call to `in_place_scope_fifo()` will panic. If multiple panics occurs, it is
444/// non-deterministic which of their panic values will propagate.
445/// Regardless, once a task is spawned using `scope.spawn_fifo()`, it will
446/// execute, even if the spawning task should later panic. `in_place_scope_fifo()`
447/// returns once all spawned jobs have completed, and any panics are
448/// propagated at that point.
449pub fn in_place_scope_fifo<'scope, OP, R>(op: OP) -> R
450where
451    OP: FnOnce(&ScopeFifo<'scope>) -> R,
452{
453    do_in_place_scope_fifo(None, op)
454}
455
456pub(crate) fn do_in_place_scope_fifo<'scope, OP, R>(registry: Option<&Arc<Registry>>, op: OP) -> R
457where
458    OP: FnOnce(&ScopeFifo<'scope>) -> R,
459{
460    let thread = unsafe { WorkerThread::current().as_ref() };
461    let scope = ScopeFifo::<'scope>::new(thread, registry);
462    scope.base.complete(thread, || op(&scope))
463}
464
465impl<'scope> Scope<'scope> {
466    fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
467        let base = ScopeBase::new(owner, registry);
468        Scope { base }
469    }
470
471    /// Spawns a job into the fork-join scope `self`. This job will
472    /// execute sometime before the fork-join scope completes. The
473    /// job is specified as a closure, and this closure receives its
474    /// own reference to the scope `self` as argument. This can be
475    /// used to inject new jobs into `self`.
476    ///
477    /// # Returns
478    ///
479    /// Nothing. The spawned closures cannot pass back values to the
480    /// caller directly, though they can write to local variables on
481    /// the stack (if those variables outlive the scope) or
482    /// communicate through shared channels.
483    ///
484    /// (The intention is to eventually integrate with Rust futures to
485    /// support spawns of functions that compute a value.)
486    ///
487    /// # Examples
488    ///
489    /// ```rust
490    /// # use rustc_thread_pool as rayon;
491    /// let mut value_a = None;
492    /// let mut value_b = None;
493    /// let mut value_c = None;
494    /// rayon::scope(|s| {
495    ///     s.spawn(|s1| {
496    ///           // ^ this is the same scope as `s`; this handle `s1`
497    ///           //   is intended for use by the spawned task,
498    ///           //   since scope handles cannot cross thread boundaries.
499    ///
500    ///         value_a = Some(22);
501    ///
502    ///         // the scope `s` will not end until all these tasks are done
503    ///         s1.spawn(|_| {
504    ///             value_b = Some(44);
505    ///         });
506    ///     });
507    ///
508    ///     s.spawn(|_| {
509    ///         value_c = Some(66);
510    ///     });
511    /// });
512    /// assert_eq!(value_a, Some(22));
513    /// assert_eq!(value_b, Some(44));
514    /// assert_eq!(value_c, Some(66));
515    /// ```
516    ///
517    /// # See also
518    ///
519    /// The [`scope` function] has more extensive documentation about
520    /// task spawning.
521    ///
522    /// [`scope` function]: fn.scope.html
523    pub fn spawn<BODY>(&self, body: BODY)
524    where
525        BODY: FnOnce(&Scope<'scope>) + Send + 'scope,
526    {
527        let scope_ptr = ScopePtr(self);
528        let job = HeapJob::new(self.base.tlv, move || unsafe {
529            // SAFETY: this job will execute before the scope ends.
530            let scope = scope_ptr.as_ref();
531            ScopeBase::execute_job(&scope.base, move || body(scope))
532        });
533        let job_ref = self.base.heap_job_ref(job);
534
535        // Since `Scope` implements `Sync`, we can't be sure that we're still in a
536        // thread of this pool, so we can't just push to the local worker thread.
537        // Also, this might be an in-place scope.
538        self.base.registry.inject_or_push(job_ref);
539    }
540
541    /// Spawns a job into every thread of the fork-join scope `self`. This job will
542    /// execute on each thread sometime before the fork-join scope completes. The
543    /// job is specified as a closure, and this closure receives its own reference
544    /// to the scope `self` as argument, as well as a `BroadcastContext`.
545    pub fn spawn_broadcast<BODY>(&self, body: BODY)
546    where
547        BODY: Fn(&Scope<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope,
548    {
549        let scope_ptr = ScopePtr(self);
550        let job = ArcJob::new(move || unsafe {
551            // SAFETY: this job will execute before the scope ends.
552            let scope = scope_ptr.as_ref();
553            let body = &body;
554            let func = move || BroadcastContext::with(move |ctx| body(scope, ctx));
555            ScopeBase::execute_job(&scope.base, func)
556        });
557        self.base.inject_broadcast(job)
558    }
559}
560
561impl<'scope> ScopeFifo<'scope> {
562    fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
563        let base = ScopeBase::new(owner, registry);
564        let num_threads = base.registry.num_threads();
565        let fifos = (0..num_threads).map(|_| JobFifo::new()).collect();
566        ScopeFifo { base, fifos }
567    }
568
569    /// Spawns a job into the fork-join scope `self`. This job will
570    /// execute sometime before the fork-join scope completes. The
571    /// job is specified as a closure, and this closure receives its
572    /// own reference to the scope `self` as argument. This can be
573    /// used to inject new jobs into `self`.
574    ///
575    /// # See also
576    ///
577    /// This method is akin to [`Scope::spawn()`], but with a FIFO
578    /// priority. The [`scope_fifo` function] has more details about
579    /// this distinction.
580    ///
581    /// [`Scope::spawn()`]: struct.Scope.html#method.spawn
582    /// [`scope_fifo` function]: fn.scope_fifo.html
583    pub fn spawn_fifo<BODY>(&self, body: BODY)
584    where
585        BODY: FnOnce(&ScopeFifo<'scope>) + Send + 'scope,
586    {
587        let scope_ptr = ScopePtr(self);
588        let job = HeapJob::new(self.base.tlv, move || unsafe {
589            // SAFETY: this job will execute before the scope ends.
590            let scope = scope_ptr.as_ref();
591            ScopeBase::execute_job(&scope.base, move || body(scope))
592        });
593        let job_ref = self.base.heap_job_ref(job);
594
595        // If we're in the pool, use our scope's private fifo for this thread to execute
596        // in a locally-FIFO order. Otherwise, just use the pool's global injector.
597        match self.base.registry.current_thread() {
598            Some(worker) => {
599                let fifo = &self.fifos[worker.index()];
600                // SAFETY: this job will execute before the scope ends.
601                unsafe { worker.push(fifo.push(job_ref)) };
602            }
603            None => self.base.registry.inject(job_ref),
604        }
605    }
606
607    /// Spawns a job into every thread of the fork-join scope `self`. This job will
608    /// execute on each thread sometime before the fork-join scope completes. The
609    /// job is specified as a closure, and this closure receives its own reference
610    /// to the scope `self` as argument, as well as a `BroadcastContext`.
611    pub fn spawn_broadcast<BODY>(&self, body: BODY)
612    where
613        BODY: Fn(&ScopeFifo<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope,
614    {
615        let scope_ptr = ScopePtr(self);
616        let job = ArcJob::new(move || unsafe {
617            // SAFETY: this job will execute before the scope ends.
618            let scope = scope_ptr.as_ref();
619            let body = &body;
620            let func = move || BroadcastContext::with(move |ctx| body(scope, ctx));
621            ScopeBase::execute_job(&scope.base, func)
622        });
623        self.base.inject_broadcast(job)
624    }
625}
626
627impl<'scope> ScopeBase<'scope> {
628    /// Creates the base of a new scope for the given registry
629    fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
630        let registry = registry.unwrap_or_else(|| match owner {
631            Some(owner) => owner.registry(),
632            None => global_registry(),
633        });
634
635        ScopeBase {
636            registry: Arc::clone(registry),
637            panic: AtomicPtr::new(ptr::null_mut()),
638            job_completed_latch: CountLatch::new(owner),
639            marker: PhantomData,
640            tlv: tlv::get(),
641        }
642    }
643
644    fn heap_job_ref<FUNC>(&self, job: Box<HeapJob<FUNC>>) -> JobRef
645    where
646        FUNC: FnOnce() + Send + 'scope,
647    {
648        unsafe {
649            self.job_completed_latch.increment();
650            job.into_job_ref()
651        }
652    }
653
654    fn inject_broadcast<FUNC>(&self, job: Arc<ArcJob<FUNC>>)
655    where
656        FUNC: Fn() + Send + Sync + 'scope,
657    {
658        let n_threads = self.registry.num_threads();
659        let job_refs = (0..n_threads).map(|_| unsafe {
660            self.job_completed_latch.increment();
661            ArcJob::as_job_ref(&job)
662        });
663
664        self.registry.inject_broadcast(job_refs);
665    }
666
667    /// Executes `func` as a job, either aborting or executing as
668    /// appropriate.
669    fn complete<FUNC, R>(&self, owner: Option<&WorkerThread>, func: FUNC) -> R
670    where
671        FUNC: FnOnce() -> R,
672    {
673        let result = unsafe { Self::execute_job_closure(self, func) };
674        self.job_completed_latch.wait(owner);
675
676        // Restore the TLV if we ran some jobs while waiting
677        tlv::set(self.tlv);
678
679        self.maybe_propagate_panic();
680        result.unwrap() // only None if `op` panicked, and that would have been propagated
681    }
682
683    /// Executes `func` as a job, either aborting or executing as
684    /// appropriate.
685    unsafe fn execute_job<FUNC>(this: *const Self, func: FUNC)
686    where
687        FUNC: FnOnce(),
688    {
689        let _: Option<()> = unsafe { Self::execute_job_closure(this, func) };
690    }
691
692    /// Executes `func` as a job in scope. Adjusts the "job completed"
693    /// counters and also catches any panic and stores it into
694    /// `scope`.
695    unsafe fn execute_job_closure<FUNC, R>(this: *const Self, func: FUNC) -> Option<R>
696    where
697        FUNC: FnOnce() -> R,
698    {
699        let result = match unwind::halt_unwinding(func) {
700            Ok(r) => Some(r),
701            Err(err) => {
702                unsafe { (*this).job_panicked(err) };
703                None
704            }
705        };
706        unsafe { Latch::set(&(*this).job_completed_latch) };
707        result
708    }
709
710    fn job_panicked(&self, err: Box<dyn Any + Send + 'static>) {
711        // capture the first error we see, free the rest
712        if self.panic.load(Ordering::Relaxed).is_null() {
713            let nil = ptr::null_mut();
714            let mut err = ManuallyDrop::new(Box::new(err)); // box up the fat ptr
715            let err_ptr: *mut Box<dyn Any + Send + 'static> = &mut **err;
716            if self
717                .panic
718                .compare_exchange(nil, err_ptr, Ordering::Release, Ordering::Relaxed)
719                .is_ok()
720            {
721                // ownership now transferred into self.panic
722            } else {
723                // another panic raced in ahead of us, so drop ours
724                let _: Box<Box<_>> = ManuallyDrop::into_inner(err);
725            }
726        }
727    }
728
729    fn maybe_propagate_panic(&self) {
730        // propagate panic, if any occurred; at this point, all
731        // outstanding jobs have completed, so we can use a relaxed
732        // ordering:
733        let panic = self.panic.swap(ptr::null_mut(), Ordering::Relaxed);
734        if !panic.is_null() {
735            let value = unsafe { Box::from_raw(panic) };
736
737            // Restore the TLV if we ran some jobs while waiting
738            tlv::set(self.tlv);
739
740            unwind::resume_unwinding(*value);
741        }
742    }
743}
744
745impl<'scope> fmt::Debug for Scope<'scope> {
746    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
747        fmt.debug_struct("Scope")
748            .field("pool_id", &self.base.registry.id())
749            .field("panic", &self.base.panic)
750            .field("job_completed_latch", &self.base.job_completed_latch)
751            .finish()
752    }
753}
754
755impl<'scope> fmt::Debug for ScopeFifo<'scope> {
756    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
757        fmt.debug_struct("ScopeFifo")
758            .field("num_fifos", &self.fifos.len())
759            .field("pool_id", &self.base.registry.id())
760            .field("panic", &self.base.panic)
761            .field("job_completed_latch", &self.base.job_completed_latch)
762            .finish()
763    }
764}
765
766/// Used to capture a scope `&Self` pointer in jobs, without faking a lifetime.
767///
768/// Unsafe code is still required to dereference the pointer, but that's fine in
769/// scope jobs that are guaranteed to execute before the scope ends.
770struct ScopePtr<T>(*const T);
771
772// SAFETY: !Send for raw pointers is not for safety, just as a lint
773unsafe impl<T: Sync> Send for ScopePtr<T> {}
774
775// SAFETY: !Sync for raw pointers is not for safety, just as a lint
776unsafe impl<T: Sync> Sync for ScopePtr<T> {}
777
778impl<T> ScopePtr<T> {
779    // Helper to avoid disjoint captures of `scope_ptr.0`
780    unsafe fn as_ref(&self) -> &T {
781        unsafe { &*self.0 }
782    }
783}