rustc_thread_pool/thread_pool/mod.rs
1//! Contains support for user-managed thread pools, represented by the
2//! the [`ThreadPool`] type (see that struct for details).
3//!
4//! [`ThreadPool`]: struct.ThreadPool.html
5
6use std::error::Error;
7use std::fmt;
8use std::sync::Arc;
9
10use crate::broadcast::{self, BroadcastContext};
11use crate::registry::{Registry, ThreadSpawn, WorkerThread};
12use crate::scope::{do_in_place_scope, do_in_place_scope_fifo};
13use crate::{
14 Scope, ScopeFifo, ThreadPoolBuildError, ThreadPoolBuilder, join, scope, scope_fifo, spawn,
15};
16
17mod tests;
18
19/// Represents a user created [thread-pool].
20///
21/// Use a [`ThreadPoolBuilder`] to specify the number and/or names of threads
22/// in the pool. After calling [`ThreadPoolBuilder::build()`], you can then
23/// execute functions explicitly within this [`ThreadPool`] using
24/// [`ThreadPool::install()`]. By contrast, top level rayon functions
25/// (like `join()`) will execute implicitly within the current thread-pool.
26///
27///
28/// ## Creating a ThreadPool
29///
30/// ```rust
31/// # use rustc_thread_pool as rayon;
32/// let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap();
33/// ```
34///
35/// [`install()`][`ThreadPool::install()`] executes a closure in one of the `ThreadPool`'s
36/// threads. In addition, any other rayon operations called inside of `install()` will also
37/// execute in the context of the `ThreadPool`.
38///
39/// When the `ThreadPool` is dropped, that's a signal for the threads it manages to terminate,
40/// they will complete executing any remaining work that you have spawned, and automatically
41/// terminate.
42///
43///
44/// [thread-pool]: https://en.wikipedia.org/wiki/Thread_pool
45/// [`ThreadPool`]: struct.ThreadPool.html
46/// [`ThreadPool::new()`]: struct.ThreadPool.html#method.new
47/// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
48/// [`ThreadPoolBuilder::build()`]: struct.ThreadPoolBuilder.html#method.build
49/// [`ThreadPool::install()`]: struct.ThreadPool.html#method.install
50pub struct ThreadPool {
51 registry: Arc<Registry>,
52}
53
54impl ThreadPool {
55 #[deprecated(note = "Use `ThreadPoolBuilder::build`")]
56 #[allow(deprecated)]
57 /// Deprecated in favor of `ThreadPoolBuilder::build`.
58 pub fn new(configuration: crate::Configuration) -> Result<ThreadPool, Box<dyn Error>> {
59 Self::build(configuration.into_builder()).map_err(Box::from)
60 }
61
62 pub(super) fn build<S>(
63 builder: ThreadPoolBuilder<S>,
64 ) -> Result<ThreadPool, ThreadPoolBuildError>
65 where
66 S: ThreadSpawn,
67 {
68 let registry = Registry::new(builder)?;
69 Ok(ThreadPool { registry })
70 }
71
72 /// Executes `op` within the threadpool. Any attempts to use
73 /// `join`, `scope`, or parallel iterators will then operate
74 /// within that threadpool.
75 ///
76 /// # Warning: thread-local data
77 ///
78 /// Because `op` is executing within the Rayon thread-pool,
79 /// thread-local data from the current thread will not be
80 /// accessible.
81 ///
82 /// # Warning: execution order
83 ///
84 /// If the current thread is part of a different thread pool, it will try to
85 /// keep busy while the `op` completes in its target pool, similar to
86 /// calling [`ThreadPool::yield_now()`] in a loop. Therefore, it may
87 /// potentially schedule other tasks to run on the current thread in the
88 /// meantime. For example
89 ///
90 /// ```rust
91 /// # use rustc_thread_pool as rayon;
92 /// fn main() {
93 /// rayon::ThreadPoolBuilder::new().num_threads(1).build_global().unwrap();
94 /// let pool = rustc_thread_pool::ThreadPoolBuilder::default().build().unwrap();
95 /// let do_it = || {
96 /// print!("one ");
97 /// pool.install(||{});
98 /// print!("two ");
99 /// };
100 /// rayon::join(|| do_it(), || do_it());
101 /// }
102 /// ```
103 ///
104 /// Since we configured just one thread in the global pool, one might
105 /// expect `do_it()` to run sequentially, producing:
106 ///
107 /// ```ascii
108 /// one two one two
109 /// ```
110 ///
111 /// However each call to `install()` yields implicitly, allowing rayon to
112 /// run multiple instances of `do_it()` concurrently on the single, global
113 /// thread. The following output would be equally valid:
114 ///
115 /// ```ascii
116 /// one one two two
117 /// ```
118 ///
119 /// # Panics
120 ///
121 /// If `op` should panic, that panic will be propagated.
122 ///
123 /// ## Using `install()`
124 ///
125 /// ```rust
126 /// # use rustc_thread_pool as rayon;
127 /// fn main() {
128 /// let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap();
129 /// let n = pool.install(|| fib(20));
130 /// println!("{}", n);
131 /// }
132 ///
133 /// fn fib(n: usize) -> usize {
134 /// if n == 0 || n == 1 {
135 /// return n;
136 /// }
137 /// let (a, b) = rayon::join(|| fib(n - 1), || fib(n - 2)); // runs inside of `pool`
138 /// return a + b;
139 /// }
140 /// ```
141 pub fn install<OP, R>(&self, op: OP) -> R
142 where
143 OP: FnOnce() -> R + Send,
144 R: Send,
145 {
146 self.registry.in_worker(|_, _| op())
147 }
148
149 /// Executes `op` within every thread in the threadpool. Any attempts to use
150 /// `join`, `scope`, or parallel iterators will then operate within that
151 /// threadpool.
152 ///
153 /// Broadcasts are executed on each thread after they have exhausted their
154 /// local work queue, before they attempt work-stealing from other threads.
155 /// The goal of that strategy is to run everywhere in a timely manner
156 /// *without* being too disruptive to current work. There may be alternative
157 /// broadcast styles added in the future for more or less aggressive
158 /// injection, if the need arises.
159 ///
160 /// # Warning: thread-local data
161 ///
162 /// Because `op` is executing within the Rayon thread-pool,
163 /// thread-local data from the current thread will not be
164 /// accessible.
165 ///
166 /// # Panics
167 ///
168 /// If `op` should panic on one or more threads, exactly one panic
169 /// will be propagated, only after all threads have completed
170 /// (or panicked) their own `op`.
171 ///
172 /// # Examples
173 ///
174 /// ```
175 /// # use rustc_thread_pool as rayon;
176 /// use std::sync::atomic::{AtomicUsize, Ordering};
177 ///
178 /// fn main() {
179 /// let pool = rayon::ThreadPoolBuilder::new().num_threads(5).build().unwrap();
180 ///
181 /// // The argument gives context, including the index of each thread.
182 /// let v: Vec<usize> = pool.broadcast(|ctx| ctx.index() * ctx.index());
183 /// assert_eq!(v, &[0, 1, 4, 9, 16]);
184 ///
185 /// // The closure can reference the local stack
186 /// let count = AtomicUsize::new(0);
187 /// pool.broadcast(|_| count.fetch_add(1, Ordering::Relaxed));
188 /// assert_eq!(count.into_inner(), 5);
189 /// }
190 /// ```
191 pub fn broadcast<OP, R>(&self, op: OP) -> Vec<R>
192 where
193 OP: Fn(BroadcastContext<'_>) -> R + Sync,
194 R: Send,
195 {
196 // We assert that `self.registry` has not terminated.
197 unsafe { broadcast::broadcast_in(op, &self.registry) }
198 }
199
200 /// Returns the (current) number of threads in the thread pool.
201 ///
202 /// # Future compatibility note
203 ///
204 /// Note that unless this thread-pool was created with a
205 /// [`ThreadPoolBuilder`] that specifies the number of threads,
206 /// then this number may vary over time in future versions (see [the
207 /// `num_threads()` method for details][snt]).
208 ///
209 /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
210 /// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
211 #[inline]
212 pub fn current_num_threads(&self) -> usize {
213 self.registry.num_threads()
214 }
215
216 /// If called from a Rayon worker thread in this thread-pool,
217 /// returns the index of that thread; if not called from a Rayon
218 /// thread, or called from a Rayon thread that belongs to a
219 /// different thread-pool, returns `None`.
220 ///
221 /// The index for a given thread will not change over the thread's
222 /// lifetime. However, multiple threads may share the same index if
223 /// they are in distinct thread-pools.
224 ///
225 /// # Future compatibility note
226 ///
227 /// Currently, every thread-pool (including the global
228 /// thread-pool) has a fixed number of threads, but this may
229 /// change in future Rayon versions (see [the `num_threads()` method
230 /// for details][snt]). In that case, the index for a
231 /// thread would not change during its lifetime, but thread
232 /// indices may wind up being reused if threads are terminated and
233 /// restarted.
234 ///
235 /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
236 #[inline]
237 pub fn current_thread_index(&self) -> Option<usize> {
238 let curr = self.registry.current_thread()?;
239 Some(curr.index())
240 }
241
242 /// Returns true if the current worker thread currently has "local
243 /// tasks" pending. This can be useful as part of a heuristic for
244 /// deciding whether to spawn a new task or execute code on the
245 /// current thread, particularly in breadth-first
246 /// schedulers. However, keep in mind that this is an inherently
247 /// racy check, as other worker threads may be actively "stealing"
248 /// tasks from our local deque.
249 ///
250 /// **Background:** Rayon's uses a [work-stealing] scheduler. The
251 /// key idea is that each thread has its own [deque] of
252 /// tasks. Whenever a new task is spawned -- whether through
253 /// `join()`, `Scope::spawn()`, or some other means -- that new
254 /// task is pushed onto the thread's *local* deque. Worker threads
255 /// have a preference for executing their own tasks; if however
256 /// they run out of tasks, they will go try to "steal" tasks from
257 /// other threads. This function therefore has an inherent race
258 /// with other active worker threads, which may be removing items
259 /// from the local deque.
260 ///
261 /// [work-stealing]: https://en.wikipedia.org/wiki/Work_stealing
262 /// [deque]: https://en.wikipedia.org/wiki/Double-ended_queue
263 #[inline]
264 pub fn current_thread_has_pending_tasks(&self) -> Option<bool> {
265 let curr = self.registry.current_thread()?;
266 Some(!curr.local_deque_is_empty())
267 }
268
269 /// Execute `oper_a` and `oper_b` in the thread-pool and return
270 /// the results. Equivalent to `self.install(|| join(oper_a,
271 /// oper_b))`.
272 pub fn join<A, B, RA, RB>(&self, oper_a: A, oper_b: B) -> (RA, RB)
273 where
274 A: FnOnce() -> RA + Send,
275 B: FnOnce() -> RB + Send,
276 RA: Send,
277 RB: Send,
278 {
279 self.install(|| join(oper_a, oper_b))
280 }
281
282 /// Creates a scope that executes within this thread-pool.
283 /// Equivalent to `self.install(|| scope(...))`.
284 ///
285 /// See also: [the `scope()` function][scope].
286 ///
287 /// [scope]: fn.scope.html
288 pub fn scope<'scope, OP, R>(&self, op: OP) -> R
289 where
290 OP: FnOnce(&Scope<'scope>) -> R + Send,
291 R: Send,
292 {
293 self.install(|| scope(op))
294 }
295
296 /// Creates a scope that executes within this thread-pool.
297 /// Spawns from the same thread are prioritized in relative FIFO order.
298 /// Equivalent to `self.install(|| scope_fifo(...))`.
299 ///
300 /// See also: [the `scope_fifo()` function][scope_fifo].
301 ///
302 /// [scope_fifo]: fn.scope_fifo.html
303 pub fn scope_fifo<'scope, OP, R>(&self, op: OP) -> R
304 where
305 OP: FnOnce(&ScopeFifo<'scope>) -> R + Send,
306 R: Send,
307 {
308 self.install(|| scope_fifo(op))
309 }
310
311 /// Creates a scope that spawns work into this thread-pool.
312 ///
313 /// See also: [the `in_place_scope()` function][in_place_scope].
314 ///
315 /// [in_place_scope]: fn.in_place_scope.html
316 pub fn in_place_scope<'scope, OP, R>(&self, op: OP) -> R
317 where
318 OP: FnOnce(&Scope<'scope>) -> R,
319 {
320 do_in_place_scope(Some(&self.registry), op)
321 }
322
323 /// Creates a scope that spawns work into this thread-pool in FIFO order.
324 ///
325 /// See also: [the `in_place_scope_fifo()` function][in_place_scope_fifo].
326 ///
327 /// [in_place_scope_fifo]: fn.in_place_scope_fifo.html
328 pub fn in_place_scope_fifo<'scope, OP, R>(&self, op: OP) -> R
329 where
330 OP: FnOnce(&ScopeFifo<'scope>) -> R,
331 {
332 do_in_place_scope_fifo(Some(&self.registry), op)
333 }
334
335 /// Spawns an asynchronous task in this thread-pool. This task will
336 /// run in the implicit, global scope, which means that it may outlast
337 /// the current stack frame -- therefore, it cannot capture any references
338 /// onto the stack (you will likely need a `move` closure).
339 ///
340 /// See also: [the `spawn()` function defined on scopes][spawn].
341 ///
342 /// [spawn]: struct.Scope.html#method.spawn
343 pub fn spawn<OP>(&self, op: OP)
344 where
345 OP: FnOnce() + Send + 'static,
346 {
347 // We assert that `self.registry` has not terminated.
348 unsafe { spawn::spawn_in(op, &self.registry) }
349 }
350
351 /// Spawns an asynchronous task in this thread-pool. This task will
352 /// run in the implicit, global scope, which means that it may outlast
353 /// the current stack frame -- therefore, it cannot capture any references
354 /// onto the stack (you will likely need a `move` closure).
355 ///
356 /// See also: [the `spawn_fifo()` function defined on scopes][spawn_fifo].
357 ///
358 /// [spawn_fifo]: struct.ScopeFifo.html#method.spawn_fifo
359 pub fn spawn_fifo<OP>(&self, op: OP)
360 where
361 OP: FnOnce() + Send + 'static,
362 {
363 // We assert that `self.registry` has not terminated.
364 unsafe { spawn::spawn_fifo_in(op, &self.registry) }
365 }
366
367 /// Spawns an asynchronous task on every thread in this thread-pool. This task
368 /// will run in the implicit, global scope, which means that it may outlast the
369 /// current stack frame -- therefore, it cannot capture any references onto the
370 /// stack (you will likely need a `move` closure).
371 pub fn spawn_broadcast<OP>(&self, op: OP)
372 where
373 OP: Fn(BroadcastContext<'_>) + Send + Sync + 'static,
374 {
375 // We assert that `self.registry` has not terminated.
376 unsafe { broadcast::spawn_broadcast_in(op, &self.registry) }
377 }
378
379 /// Cooperatively yields execution to Rayon.
380 ///
381 /// This is similar to the general [`yield_now()`], but only if the current
382 /// thread is part of *this* thread pool.
383 ///
384 /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
385 /// nothing was available, or `None` if the current thread is not part this pool.
386 pub fn yield_now(&self) -> Option<Yield> {
387 let curr = self.registry.current_thread()?;
388 Some(curr.yield_now())
389 }
390
391 /// Cooperatively yields execution to local Rayon work.
392 ///
393 /// This is similar to the general [`yield_local()`], but only if the current
394 /// thread is part of *this* thread pool.
395 ///
396 /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
397 /// nothing was available, or `None` if the current thread is not part this pool.
398 pub fn yield_local(&self) -> Option<Yield> {
399 let curr = self.registry.current_thread()?;
400 Some(curr.yield_local())
401 }
402
403 pub(crate) fn wait_until_stopped(self) {
404 let registry = Arc::clone(&self.registry);
405 drop(self);
406 registry.wait_until_stopped();
407 }
408}
409
410impl Drop for ThreadPool {
411 fn drop(&mut self) {
412 self.registry.terminate();
413 }
414}
415
416impl fmt::Debug for ThreadPool {
417 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
418 fmt.debug_struct("ThreadPool")
419 .field("num_threads", &self.current_num_threads())
420 .field("id", &self.registry.id())
421 .finish()
422 }
423}
424
425/// If called from a Rayon worker thread, returns the index of that
426/// thread within its current pool; if not called from a Rayon thread,
427/// returns `None`.
428///
429/// The index for a given thread will not change over the thread's
430/// lifetime. However, multiple threads may share the same index if
431/// they are in distinct thread-pools.
432///
433/// See also: [the `ThreadPool::current_thread_index()` method].
434///
435/// [m]: struct.ThreadPool.html#method.current_thread_index
436///
437/// # Future compatibility note
438///
439/// Currently, every thread-pool (including the global
440/// thread-pool) has a fixed number of threads, but this may
441/// change in future Rayon versions (see [the `num_threads()` method
442/// for details][snt]). In that case, the index for a
443/// thread would not change during its lifetime, but thread
444/// indices may wind up being reused if threads are terminated and
445/// restarted.
446///
447/// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
448#[inline]
449pub fn current_thread_index() -> Option<usize> {
450 unsafe {
451 let curr = WorkerThread::current().as_ref()?;
452 Some(curr.index())
453 }
454}
455
456/// If called from a Rayon worker thread, indicates whether that
457/// thread's local deque still has pending tasks. Otherwise, returns
458/// `None`. For more information, see [the
459/// `ThreadPool::current_thread_has_pending_tasks()` method][m].
460///
461/// [m]: struct.ThreadPool.html#method.current_thread_has_pending_tasks
462#[inline]
463pub fn current_thread_has_pending_tasks() -> Option<bool> {
464 unsafe {
465 let curr = WorkerThread::current().as_ref()?;
466 Some(!curr.local_deque_is_empty())
467 }
468}
469
470/// Cooperatively yields execution to Rayon.
471///
472/// If the current thread is part of a rayon thread pool, this looks for a
473/// single unit of pending work in the pool, then executes it. Completion of
474/// that work might include nested work or further work stealing.
475///
476/// This is similar to [`std::thread::yield_now()`], but does not literally make
477/// that call. If you are implementing a polling loop, you may want to also
478/// yield to the OS scheduler yourself if no Rayon work was found.
479///
480/// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
481/// nothing was available, or `None` if this thread is not part of any pool at all.
482pub fn yield_now() -> Option<Yield> {
483 unsafe {
484 let thread = WorkerThread::current().as_ref()?;
485 Some(thread.yield_now())
486 }
487}
488
489/// Cooperatively yields execution to local Rayon work.
490///
491/// If the current thread is part of a rayon thread pool, this looks for a
492/// single unit of pending work in this thread's queue, then executes it.
493/// Completion of that work might include nested work or further work stealing.
494///
495/// This is similar to [`yield_now()`], but does not steal from other threads.
496///
497/// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
498/// nothing was available, or `None` if this thread is not part of any pool at all.
499pub fn yield_local() -> Option<Yield> {
500 unsafe {
501 let thread = WorkerThread::current().as_ref()?;
502 Some(thread.yield_local())
503 }
504}
505
506/// Result of [`yield_now()`] or [`yield_local()`].
507#[derive(Clone, Copy, Debug, PartialEq, Eq)]
508pub enum Yield {
509 /// Work was found and executed.
510 Executed,
511 /// No available work was found.
512 Idle,
513}