rustc_thread_pool/thread_pool/
mod.rs

1//! Contains support for user-managed thread pools, represented by the
2//! the [`ThreadPool`] type (see that struct for details).
3//!
4//! [`ThreadPool`]: struct.ThreadPool.html
5
6use std::error::Error;
7use std::fmt;
8use std::sync::Arc;
9
10use crate::broadcast::{self, BroadcastContext};
11use crate::registry::{Registry, ThreadSpawn, WorkerThread};
12use crate::scope::{do_in_place_scope, do_in_place_scope_fifo};
13use crate::{
14    Scope, ScopeFifo, ThreadPoolBuildError, ThreadPoolBuilder, join, scope, scope_fifo, spawn,
15};
16
17mod tests;
18
19/// Represents a user created [thread-pool].
20///
21/// Use a [`ThreadPoolBuilder`] to specify the number and/or names of threads
22/// in the pool. After calling [`ThreadPoolBuilder::build()`], you can then
23/// execute functions explicitly within this [`ThreadPool`] using
24/// [`ThreadPool::install()`]. By contrast, top level rayon functions
25/// (like `join()`) will execute implicitly within the current thread-pool.
26///
27///
28/// ## Creating a ThreadPool
29///
30/// ```rust
31/// # use rustc_thread_pool as rayon;
32/// let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap();
33/// ```
34///
35/// [`install()`][`ThreadPool::install()`] executes a closure in one of the `ThreadPool`'s
36/// threads. In addition, any other rayon operations called inside of `install()` will also
37/// execute in the context of the `ThreadPool`.
38///
39/// When the `ThreadPool` is dropped, that's a signal for the threads it manages to terminate,
40/// they will complete executing any remaining work that you have spawned, and automatically
41/// terminate.
42///
43///
44/// [thread-pool]: https://en.wikipedia.org/wiki/Thread_pool
45/// [`ThreadPool`]: struct.ThreadPool.html
46/// [`ThreadPool::new()`]: struct.ThreadPool.html#method.new
47/// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
48/// [`ThreadPoolBuilder::build()`]: struct.ThreadPoolBuilder.html#method.build
49/// [`ThreadPool::install()`]: struct.ThreadPool.html#method.install
50pub struct ThreadPool {
51    registry: Arc<Registry>,
52}
53
54impl ThreadPool {
55    #[deprecated(note = "Use `ThreadPoolBuilder::build`")]
56    #[allow(deprecated)]
57    /// Deprecated in favor of `ThreadPoolBuilder::build`.
58    pub fn new(configuration: crate::Configuration) -> Result<ThreadPool, Box<dyn Error>> {
59        Self::build(configuration.into_builder()).map_err(Box::from)
60    }
61
62    pub(super) fn build<S>(
63        builder: ThreadPoolBuilder<S>,
64    ) -> Result<ThreadPool, ThreadPoolBuildError>
65    where
66        S: ThreadSpawn,
67    {
68        let registry = Registry::new(builder)?;
69        Ok(ThreadPool { registry })
70    }
71
72    /// Executes `op` within the threadpool. Any attempts to use
73    /// `join`, `scope`, or parallel iterators will then operate
74    /// within that threadpool.
75    ///
76    /// # Warning: thread-local data
77    ///
78    /// Because `op` is executing within the Rayon thread-pool,
79    /// thread-local data from the current thread will not be
80    /// accessible.
81    ///
82    /// # Warning: execution order
83    ///
84    /// If the current thread is part of a different thread pool, it will try to
85    /// keep busy while the `op` completes in its target pool, similar to
86    /// calling [`ThreadPool::yield_now()`] in a loop. Therefore, it may
87    /// potentially schedule other tasks to run on the current thread in the
88    /// meantime. For example
89    ///
90    /// ```rust
91    /// # use rustc_thread_pool as rayon;
92    /// fn main() {
93    ///     rayon::ThreadPoolBuilder::new().num_threads(1).build_global().unwrap();
94    ///     let pool = rustc_thread_pool::ThreadPoolBuilder::default().build().unwrap();
95    ///     let do_it = || {
96    ///         print!("one ");
97    ///         pool.install(||{});
98    ///         print!("two ");
99    ///     };
100    ///     rayon::join(|| do_it(), || do_it());
101    /// }
102    /// ```
103    ///
104    /// Since we configured just one thread in the global pool, one might
105    /// expect `do_it()` to run sequentially, producing:
106    ///
107    /// ```ascii
108    /// one two one two
109    /// ```
110    ///
111    /// However each call to `install()` yields implicitly, allowing rayon to
112    /// run multiple instances of `do_it()` concurrently on the single, global
113    /// thread. The following output would be equally valid:
114    ///
115    /// ```ascii
116    /// one one two two
117    /// ```
118    ///
119    /// # Panics
120    ///
121    /// If `op` should panic, that panic will be propagated.
122    ///
123    /// ## Using `install()`
124    ///
125    /// ```rust
126    ///    # use rustc_thread_pool as rayon;
127    ///    fn main() {
128    ///         let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap();
129    ///         let n = pool.install(|| fib(20));
130    ///         println!("{}", n);
131    ///    }
132    ///
133    ///    fn fib(n: usize) -> usize {
134    ///         if n == 0 || n == 1 {
135    ///             return n;
136    ///         }
137    ///         let (a, b) = rayon::join(|| fib(n - 1), || fib(n - 2)); // runs inside of `pool`
138    ///         return a + b;
139    ///     }
140    /// ```
141    pub fn install<OP, R>(&self, op: OP) -> R
142    where
143        OP: FnOnce() -> R + Send,
144        R: Send,
145    {
146        self.registry.in_worker(|_, _| op())
147    }
148
149    /// Executes `op` within every thread in the threadpool. Any attempts to use
150    /// `join`, `scope`, or parallel iterators will then operate within that
151    /// threadpool.
152    ///
153    /// Broadcasts are executed on each thread after they have exhausted their
154    /// local work queue, before they attempt work-stealing from other threads.
155    /// The goal of that strategy is to run everywhere in a timely manner
156    /// *without* being too disruptive to current work. There may be alternative
157    /// broadcast styles added in the future for more or less aggressive
158    /// injection, if the need arises.
159    ///
160    /// # Warning: thread-local data
161    ///
162    /// Because `op` is executing within the Rayon thread-pool,
163    /// thread-local data from the current thread will not be
164    /// accessible.
165    ///
166    /// # Panics
167    ///
168    /// If `op` should panic on one or more threads, exactly one panic
169    /// will be propagated, only after all threads have completed
170    /// (or panicked) their own `op`.
171    ///
172    /// # Examples
173    ///
174    /// ```
175    ///    # use rustc_thread_pool as rayon;
176    ///    use std::sync::atomic::{AtomicUsize, Ordering};
177    ///
178    ///    fn main() {
179    ///         let pool = rayon::ThreadPoolBuilder::new().num_threads(5).build().unwrap();
180    ///
181    ///         // The argument gives context, including the index of each thread.
182    ///         let v: Vec<usize> = pool.broadcast(|ctx| ctx.index() * ctx.index());
183    ///         assert_eq!(v, &[0, 1, 4, 9, 16]);
184    ///
185    ///         // The closure can reference the local stack
186    ///         let count = AtomicUsize::new(0);
187    ///         pool.broadcast(|_| count.fetch_add(1, Ordering::Relaxed));
188    ///         assert_eq!(count.into_inner(), 5);
189    ///    }
190    /// ```
191    pub fn broadcast<OP, R>(&self, op: OP) -> Vec<R>
192    where
193        OP: Fn(BroadcastContext<'_>) -> R + Sync,
194        R: Send,
195    {
196        // We assert that `self.registry` has not terminated.
197        unsafe { broadcast::broadcast_in(op, &self.registry) }
198    }
199
200    /// Returns the (current) number of threads in the thread pool.
201    ///
202    /// # Future compatibility note
203    ///
204    /// Note that unless this thread-pool was created with a
205    /// [`ThreadPoolBuilder`] that specifies the number of threads,
206    /// then this number may vary over time in future versions (see [the
207    /// `num_threads()` method for details][snt]).
208    ///
209    /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
210    /// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
211    #[inline]
212    pub fn current_num_threads(&self) -> usize {
213        self.registry.num_threads()
214    }
215
216    /// If called from a Rayon worker thread in this thread-pool,
217    /// returns the index of that thread; if not called from a Rayon
218    /// thread, or called from a Rayon thread that belongs to a
219    /// different thread-pool, returns `None`.
220    ///
221    /// The index for a given thread will not change over the thread's
222    /// lifetime. However, multiple threads may share the same index if
223    /// they are in distinct thread-pools.
224    ///
225    /// # Future compatibility note
226    ///
227    /// Currently, every thread-pool (including the global
228    /// thread-pool) has a fixed number of threads, but this may
229    /// change in future Rayon versions (see [the `num_threads()` method
230    /// for details][snt]). In that case, the index for a
231    /// thread would not change during its lifetime, but thread
232    /// indices may wind up being reused if threads are terminated and
233    /// restarted.
234    ///
235    /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
236    #[inline]
237    pub fn current_thread_index(&self) -> Option<usize> {
238        let curr = self.registry.current_thread()?;
239        Some(curr.index())
240    }
241
242    /// Returns true if the current worker thread currently has "local
243    /// tasks" pending. This can be useful as part of a heuristic for
244    /// deciding whether to spawn a new task or execute code on the
245    /// current thread, particularly in breadth-first
246    /// schedulers. However, keep in mind that this is an inherently
247    /// racy check, as other worker threads may be actively "stealing"
248    /// tasks from our local deque.
249    ///
250    /// **Background:** Rayon's uses a [work-stealing] scheduler. The
251    /// key idea is that each thread has its own [deque] of
252    /// tasks. Whenever a new task is spawned -- whether through
253    /// `join()`, `Scope::spawn()`, or some other means -- that new
254    /// task is pushed onto the thread's *local* deque. Worker threads
255    /// have a preference for executing their own tasks; if however
256    /// they run out of tasks, they will go try to "steal" tasks from
257    /// other threads. This function therefore has an inherent race
258    /// with other active worker threads, which may be removing items
259    /// from the local deque.
260    ///
261    /// [work-stealing]: https://en.wikipedia.org/wiki/Work_stealing
262    /// [deque]: https://en.wikipedia.org/wiki/Double-ended_queue
263    #[inline]
264    pub fn current_thread_has_pending_tasks(&self) -> Option<bool> {
265        let curr = self.registry.current_thread()?;
266        Some(!curr.local_deque_is_empty())
267    }
268
269    /// Execute `oper_a` and `oper_b` in the thread-pool and return
270    /// the results. Equivalent to `self.install(|| join(oper_a,
271    /// oper_b))`.
272    pub fn join<A, B, RA, RB>(&self, oper_a: A, oper_b: B) -> (RA, RB)
273    where
274        A: FnOnce() -> RA + Send,
275        B: FnOnce() -> RB + Send,
276        RA: Send,
277        RB: Send,
278    {
279        self.install(|| join(oper_a, oper_b))
280    }
281
282    /// Creates a scope that executes within this thread-pool.
283    /// Equivalent to `self.install(|| scope(...))`.
284    ///
285    /// See also: [the `scope()` function][scope].
286    ///
287    /// [scope]: fn.scope.html
288    pub fn scope<'scope, OP, R>(&self, op: OP) -> R
289    where
290        OP: FnOnce(&Scope<'scope>) -> R + Send,
291        R: Send,
292    {
293        self.install(|| scope(op))
294    }
295
296    /// Creates a scope that executes within this thread-pool.
297    /// Spawns from the same thread are prioritized in relative FIFO order.
298    /// Equivalent to `self.install(|| scope_fifo(...))`.
299    ///
300    /// See also: [the `scope_fifo()` function][scope_fifo].
301    ///
302    /// [scope_fifo]: fn.scope_fifo.html
303    pub fn scope_fifo<'scope, OP, R>(&self, op: OP) -> R
304    where
305        OP: FnOnce(&ScopeFifo<'scope>) -> R + Send,
306        R: Send,
307    {
308        self.install(|| scope_fifo(op))
309    }
310
311    /// Creates a scope that spawns work into this thread-pool.
312    ///
313    /// See also: [the `in_place_scope()` function][in_place_scope].
314    ///
315    /// [in_place_scope]: fn.in_place_scope.html
316    pub fn in_place_scope<'scope, OP, R>(&self, op: OP) -> R
317    where
318        OP: FnOnce(&Scope<'scope>) -> R,
319    {
320        do_in_place_scope(Some(&self.registry), op)
321    }
322
323    /// Creates a scope that spawns work into this thread-pool in FIFO order.
324    ///
325    /// See also: [the `in_place_scope_fifo()` function][in_place_scope_fifo].
326    ///
327    /// [in_place_scope_fifo]: fn.in_place_scope_fifo.html
328    pub fn in_place_scope_fifo<'scope, OP, R>(&self, op: OP) -> R
329    where
330        OP: FnOnce(&ScopeFifo<'scope>) -> R,
331    {
332        do_in_place_scope_fifo(Some(&self.registry), op)
333    }
334
335    /// Spawns an asynchronous task in this thread-pool. This task will
336    /// run in the implicit, global scope, which means that it may outlast
337    /// the current stack frame -- therefore, it cannot capture any references
338    /// onto the stack (you will likely need a `move` closure).
339    ///
340    /// See also: [the `spawn()` function defined on scopes][spawn].
341    ///
342    /// [spawn]: struct.Scope.html#method.spawn
343    pub fn spawn<OP>(&self, op: OP)
344    where
345        OP: FnOnce() + Send + 'static,
346    {
347        // We assert that `self.registry` has not terminated.
348        unsafe { spawn::spawn_in(op, &self.registry) }
349    }
350
351    /// Spawns an asynchronous task in this thread-pool. This task will
352    /// run in the implicit, global scope, which means that it may outlast
353    /// the current stack frame -- therefore, it cannot capture any references
354    /// onto the stack (you will likely need a `move` closure).
355    ///
356    /// See also: [the `spawn_fifo()` function defined on scopes][spawn_fifo].
357    ///
358    /// [spawn_fifo]: struct.ScopeFifo.html#method.spawn_fifo
359    pub fn spawn_fifo<OP>(&self, op: OP)
360    where
361        OP: FnOnce() + Send + 'static,
362    {
363        // We assert that `self.registry` has not terminated.
364        unsafe { spawn::spawn_fifo_in(op, &self.registry) }
365    }
366
367    /// Spawns an asynchronous task on every thread in this thread-pool. This task
368    /// will run in the implicit, global scope, which means that it may outlast the
369    /// current stack frame -- therefore, it cannot capture any references onto the
370    /// stack (you will likely need a `move` closure).
371    pub fn spawn_broadcast<OP>(&self, op: OP)
372    where
373        OP: Fn(BroadcastContext<'_>) + Send + Sync + 'static,
374    {
375        // We assert that `self.registry` has not terminated.
376        unsafe { broadcast::spawn_broadcast_in(op, &self.registry) }
377    }
378
379    /// Cooperatively yields execution to Rayon.
380    ///
381    /// This is similar to the general [`yield_now()`], but only if the current
382    /// thread is part of *this* thread pool.
383    ///
384    /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
385    /// nothing was available, or `None` if the current thread is not part this pool.
386    pub fn yield_now(&self) -> Option<Yield> {
387        let curr = self.registry.current_thread()?;
388        Some(curr.yield_now())
389    }
390
391    /// Cooperatively yields execution to local Rayon work.
392    ///
393    /// This is similar to the general [`yield_local()`], but only if the current
394    /// thread is part of *this* thread pool.
395    ///
396    /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
397    /// nothing was available, or `None` if the current thread is not part this pool.
398    pub fn yield_local(&self) -> Option<Yield> {
399        let curr = self.registry.current_thread()?;
400        Some(curr.yield_local())
401    }
402
403    pub(crate) fn wait_until_stopped(self) {
404        let registry = Arc::clone(&self.registry);
405        drop(self);
406        registry.wait_until_stopped();
407    }
408}
409
410impl Drop for ThreadPool {
411    fn drop(&mut self) {
412        self.registry.terminate();
413    }
414}
415
416impl fmt::Debug for ThreadPool {
417    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
418        fmt.debug_struct("ThreadPool")
419            .field("num_threads", &self.current_num_threads())
420            .field("id", &self.registry.id())
421            .finish()
422    }
423}
424
425/// If called from a Rayon worker thread, returns the index of that
426/// thread within its current pool; if not called from a Rayon thread,
427/// returns `None`.
428///
429/// The index for a given thread will not change over the thread's
430/// lifetime. However, multiple threads may share the same index if
431/// they are in distinct thread-pools.
432///
433/// See also: [the `ThreadPool::current_thread_index()` method].
434///
435/// [m]: struct.ThreadPool.html#method.current_thread_index
436///
437/// # Future compatibility note
438///
439/// Currently, every thread-pool (including the global
440/// thread-pool) has a fixed number of threads, but this may
441/// change in future Rayon versions (see [the `num_threads()` method
442/// for details][snt]). In that case, the index for a
443/// thread would not change during its lifetime, but thread
444/// indices may wind up being reused if threads are terminated and
445/// restarted.
446///
447/// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
448#[inline]
449pub fn current_thread_index() -> Option<usize> {
450    unsafe {
451        let curr = WorkerThread::current().as_ref()?;
452        Some(curr.index())
453    }
454}
455
456/// If called from a Rayon worker thread, indicates whether that
457/// thread's local deque still has pending tasks. Otherwise, returns
458/// `None`. For more information, see [the
459/// `ThreadPool::current_thread_has_pending_tasks()` method][m].
460///
461/// [m]: struct.ThreadPool.html#method.current_thread_has_pending_tasks
462#[inline]
463pub fn current_thread_has_pending_tasks() -> Option<bool> {
464    unsafe {
465        let curr = WorkerThread::current().as_ref()?;
466        Some(!curr.local_deque_is_empty())
467    }
468}
469
470/// Cooperatively yields execution to Rayon.
471///
472/// If the current thread is part of a rayon thread pool, this looks for a
473/// single unit of pending work in the pool, then executes it. Completion of
474/// that work might include nested work or further work stealing.
475///
476/// This is similar to [`std::thread::yield_now()`], but does not literally make
477/// that call. If you are implementing a polling loop, you may want to also
478/// yield to the OS scheduler yourself if no Rayon work was found.
479///
480/// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
481/// nothing was available, or `None` if this thread is not part of any pool at all.
482pub fn yield_now() -> Option<Yield> {
483    unsafe {
484        let thread = WorkerThread::current().as_ref()?;
485        Some(thread.yield_now())
486    }
487}
488
489/// Cooperatively yields execution to local Rayon work.
490///
491/// If the current thread is part of a rayon thread pool, this looks for a
492/// single unit of pending work in this thread's queue, then executes it.
493/// Completion of that work might include nested work or further work stealing.
494///
495/// This is similar to [`yield_now()`], but does not steal from other threads.
496///
497/// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
498/// nothing was available, or `None` if this thread is not part of any pool at all.
499pub fn yield_local() -> Option<Yield> {
500    unsafe {
501        let thread = WorkerThread::current().as_ref()?;
502        Some(thread.yield_local())
503    }
504}
505
506/// Result of [`yield_now()`] or [`yield_local()`].
507#[derive(Clone, Copy, Debug, PartialEq, Eq)]
508pub enum Yield {
509    /// Work was found and executed.
510    Executed,
511    /// No available work was found.
512    Idle,
513}