rustc_thread_pool/lib.rs
1//! Rayon-core houses the core stable APIs of Rayon.
2//!
3//! These APIs have been mirrored in the Rayon crate and it is recommended to use these from there.
4//!
5//! [`join`] is used to take two closures and potentially run them in parallel.
6//! - It will run in parallel if task B gets stolen before task A can finish.
7//! - It will run sequentially if task A finishes before task B is stolen and can continue on task B.
8//!
9//! [`scope`] creates a scope in which you can run any number of parallel tasks.
10//! These tasks can spawn nested tasks and scopes, but given the nature of work stealing, the order of execution can not be guaranteed.
11//! The scope will exist until all tasks spawned within the scope have been completed.
12//!
13//! [`spawn`] add a task into the 'static' or 'global' scope, or a local scope created by the [`scope()`] function.
14//!
15//! [`ThreadPool`] can be used to create your own thread pools (using [`ThreadPoolBuilder`]) or to customize the global one.
16//! Tasks spawned within the pool (using [`install()`], [`join()`], etc.) will be added to a deque,
17//! where it becomes available for work stealing from other threads in the local threadpool.
18//!
19//! [`join`]: fn.join.html
20//! [`scope`]: fn.scope.html
21//! [`scope()`]: fn.scope.html
22//! [`spawn`]: fn.spawn.html
23//! [`ThreadPool`]: struct.threadpool.html
24//! [`install()`]: struct.ThreadPool.html#method.install
25//! [`spawn()`]: struct.ThreadPool.html#method.spawn
26//! [`join()`]: struct.ThreadPool.html#method.join
27//! [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
28//!
29//! # Global fallback when threading is unsupported
30//!
31//! Rayon uses `std` APIs for threading, but some targets have incomplete implementations that
32//! always return `Unsupported` errors. The WebAssembly `wasm32-unknown-unknown` and `wasm32-wasi`
33//! targets are notable examples of this. Rather than panicking on the unsupported error when
34//! creating the implicit global threadpool, Rayon configures a fallback mode instead.
35//!
36//! This fallback mode mostly functions as if it were using a single-threaded "pool", like setting
37//! `RAYON_NUM_THREADS=1`. For example, `join` will execute its two closures sequentially, since
38//! there is no other thread to share the work. However, since the pool is not running independent
39//! of the main thread, non-blocking calls like `spawn` may not execute at all, unless a lower-
40//! priority call like `broadcast` gives them an opening. The fallback mode does not try to emulate
41//! anything like thread preemption or `async` task switching, but `yield_now` or `yield_local`
42//! can also volunteer execution time.
43//!
44//! Explicit `ThreadPoolBuilder` methods always report their error without any fallback.
45//!
46//! # Restricting multiple versions
47//!
48//! In order to ensure proper coordination between threadpools, and especially
49//! to make sure there's only one global threadpool, `rayon-core` is actively
50//! restricted from building multiple versions of itself into a single target.
51//! You may see a build error like this in violation:
52//!
53//! ```text
54//! error: native library `rayon-core` is being linked to by more
55//! than one package, and can only be linked to by one package
56//! ```
57//!
58//! While we strive to keep `rayon-core` semver-compatible, it's still
59//! possible to arrive at this situation if different crates have overly
60//! restrictive tilde or inequality requirements for `rayon-core`. The
61//! conflicting requirements will need to be resolved before the build will
62//! succeed.
63
64#![cfg_attr(test, allow(unused_crate_dependencies))]
65#![warn(rust_2018_idioms)]
66
67use std::any::Any;
68use std::error::Error;
69use std::marker::PhantomData;
70use std::str::FromStr;
71use std::{env, fmt, io, thread};
72
73#[macro_use]
74mod private;
75
76mod broadcast;
77mod job;
78mod join;
79mod latch;
80mod registry;
81mod scope;
82mod sleep;
83mod spawn;
84mod thread_pool;
85mod unwind;
86mod worker_local;
87
88mod compile_fail;
89mod tests;
90
91pub mod tlv;
92
93pub use worker_local::WorkerLocal;
94
95pub use self::broadcast::{BroadcastContext, broadcast, spawn_broadcast};
96pub use self::join::{join, join_context};
97use self::registry::{CustomSpawn, DefaultSpawn, ThreadSpawn};
98pub use self::registry::{Registry, ThreadBuilder, mark_blocked, mark_unblocked};
99pub use self::scope::{Scope, ScopeFifo, in_place_scope, in_place_scope_fifo, scope, scope_fifo};
100pub use self::spawn::{spawn, spawn_fifo};
101pub use self::thread_pool::{
102 ThreadPool, Yield, current_thread_has_pending_tasks, current_thread_index, yield_local,
103 yield_now,
104};
105
106/// Returns the maximum number of threads that Rayon supports in a single thread-pool.
107///
108/// If a higher thread count is requested by calling `ThreadPoolBuilder::num_threads` or by setting
109/// the `RAYON_NUM_THREADS` environment variable, then it will be reduced to this maximum.
110///
111/// The value may vary between different targets, and is subject to change in new Rayon versions.
112pub fn max_num_threads() -> usize {
113 // We are limited by the bits available in the sleep counter's `AtomicUsize`.
114 crate::sleep::THREADS_MAX
115}
116
117/// Returns the number of threads in the current registry. If this
118/// code is executing within a Rayon thread-pool, then this will be
119/// the number of threads for the thread-pool of the current
120/// thread. Otherwise, it will be the number of threads for the global
121/// thread-pool.
122///
123/// This can be useful when trying to judge how many times to split
124/// parallel work (the parallel iterator traits use this value
125/// internally for this purpose).
126///
127/// # Future compatibility note
128///
129/// Note that unless this thread-pool was created with a
130/// builder that specifies the number of threads, then this
131/// number may vary over time in future versions (see [the
132/// `num_threads()` method for details][snt]).
133///
134/// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
135pub fn current_num_threads() -> usize {
136 crate::registry::Registry::current_num_threads()
137}
138
139/// Error when initializing a thread pool.
140#[derive(Debug)]
141pub struct ThreadPoolBuildError {
142 kind: ErrorKind,
143}
144
145#[derive(Debug)]
146enum ErrorKind {
147 GlobalPoolAlreadyInitialized,
148 IOError(io::Error),
149}
150
151/// Used to create a new [`ThreadPool`] or to configure the global rayon thread pool.
152/// ## Creating a ThreadPool
153/// The following creates a thread pool with 22 threads.
154///
155/// ```rust
156/// # use rustc_thread_pool as rayon;
157/// let pool = rayon::ThreadPoolBuilder::new().num_threads(22).build().unwrap();
158/// ```
159///
160/// To instead configure the global thread pool, use [`build_global()`]:
161///
162/// ```rust
163/// # use rustc_thread_pool as rayon;
164/// rayon::ThreadPoolBuilder::new().num_threads(22).build_global().unwrap();
165/// ```
166///
167/// [`ThreadPool`]: struct.ThreadPool.html
168/// [`build_global()`]: struct.ThreadPoolBuilder.html#method.build_global
169pub struct ThreadPoolBuilder<S = DefaultSpawn> {
170 /// The number of threads in the rayon thread pool.
171 /// If zero will use the RAYON_NUM_THREADS environment variable.
172 /// If RAYON_NUM_THREADS is invalid or zero will use the default.
173 num_threads: usize,
174
175 /// Custom closure, if any, to handle a panic that we cannot propagate
176 /// anywhere else.
177 panic_handler: Option<Box<PanicHandler>>,
178
179 /// Closure to compute the name of a thread.
180 get_thread_name: Option<Box<dyn FnMut(usize) -> String>>,
181
182 /// The stack size for the created worker threads
183 stack_size: Option<usize>,
184
185 /// Closure invoked on deadlock.
186 deadlock_handler: Option<Box<DeadlockHandler>>,
187
188 /// Closure invoked on worker thread start.
189 start_handler: Option<Box<StartHandler>>,
190
191 /// Closure invoked on worker thread exit.
192 exit_handler: Option<Box<ExitHandler>>,
193
194 /// Closure invoked to spawn threads.
195 spawn_handler: S,
196
197 /// Closure invoked when starting computations in a thread.
198 acquire_thread_handler: Option<Box<AcquireThreadHandler>>,
199
200 /// Closure invoked when blocking in a thread.
201 release_thread_handler: Option<Box<ReleaseThreadHandler>>,
202
203 /// If false, worker threads will execute spawned jobs in a
204 /// "depth-first" fashion. If true, they will do a "breadth-first"
205 /// fashion. Depth-first is the default.
206 breadth_first: bool,
207}
208
209/// Contains the rayon thread pool configuration. Use [`ThreadPoolBuilder`] instead.
210///
211/// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
212#[deprecated(note = "Use `ThreadPoolBuilder`")]
213#[derive(Default)]
214pub struct Configuration {
215 builder: ThreadPoolBuilder,
216}
217
218/// The type for a panic handling closure. Note that this same closure
219/// may be invoked multiple times in parallel.
220type PanicHandler = dyn Fn(Box<dyn Any + Send>) + Send + Sync;
221
222/// The type for a closure that gets invoked when the Rayon thread pool deadlocks
223type DeadlockHandler = dyn Fn() + Send + Sync;
224
225/// The type for a closure that gets invoked when a thread starts. The
226/// closure is passed the index of the thread on which it is invoked.
227/// Note that this same closure may be invoked multiple times in parallel.
228type StartHandler = dyn Fn(usize) + Send + Sync;
229
230/// The type for a closure that gets invoked when a thread exits. The
231/// closure is passed the index of the thread on which it is invoked.
232/// Note that this same closure may be invoked multiple times in parallel.
233type ExitHandler = dyn Fn(usize) + Send + Sync;
234
235// NB: We can't `#[derive(Default)]` because `S` is left ambiguous.
236impl Default for ThreadPoolBuilder {
237 fn default() -> Self {
238 ThreadPoolBuilder {
239 num_threads: 0,
240 panic_handler: None,
241 get_thread_name: None,
242 stack_size: None,
243 start_handler: None,
244 exit_handler: None,
245 deadlock_handler: None,
246 acquire_thread_handler: None,
247 release_thread_handler: None,
248 spawn_handler: DefaultSpawn,
249 breadth_first: false,
250 }
251 }
252}
253
254/// The type for a closure that gets invoked before starting computations in a thread.
255/// Note that this same closure may be invoked multiple times in parallel.
256type AcquireThreadHandler = dyn Fn() + Send + Sync;
257
258/// The type for a closure that gets invoked before blocking in a thread.
259/// Note that this same closure may be invoked multiple times in parallel.
260type ReleaseThreadHandler = dyn Fn() + Send + Sync;
261
262impl ThreadPoolBuilder {
263 /// Creates and returns a valid rayon thread pool builder, but does not initialize it.
264 pub fn new() -> Self {
265 Self::default()
266 }
267}
268
269/// Note: the `S: ThreadSpawn` constraint is an internal implementation detail for the
270/// default spawn and those set by [`spawn_handler`](#method.spawn_handler).
271impl<S> ThreadPoolBuilder<S>
272where
273 S: ThreadSpawn,
274{
275 /// Creates a new `ThreadPool` initialized using this configuration.
276 pub fn build(self) -> Result<ThreadPool, ThreadPoolBuildError> {
277 ThreadPool::build(self)
278 }
279
280 /// Initializes the global thread pool. This initialization is
281 /// **optional**. If you do not call this function, the thread pool
282 /// will be automatically initialized with the default
283 /// configuration. Calling `build_global` is not recommended, except
284 /// in two scenarios:
285 ///
286 /// - You wish to change the default configuration.
287 /// - You are running a benchmark, in which case initializing may
288 /// yield slightly more consistent results, since the worker threads
289 /// will already be ready to go even in the first iteration. But
290 /// this cost is minimal.
291 ///
292 /// Initialization of the global thread pool happens exactly
293 /// once. Once started, the configuration cannot be
294 /// changed. Therefore, if you call `build_global` a second time, it
295 /// will return an error. An `Ok` result indicates that this
296 /// is the first initialization of the thread pool.
297 pub fn build_global(self) -> Result<(), ThreadPoolBuildError> {
298 let registry = registry::init_global_registry(self)?;
299 registry.wait_until_primed();
300 Ok(())
301 }
302}
303
304impl ThreadPoolBuilder {
305 /// Creates a scoped `ThreadPool` initialized using this configuration.
306 ///
307 /// This is a convenience function for building a pool using [`std::thread::scope`]
308 /// to spawn threads in a [`spawn_handler`](#method.spawn_handler).
309 /// The threads in this pool will start by calling `wrapper`, which should
310 /// do initialization and continue by calling `ThreadBuilder::run()`.
311 ///
312 /// [`std::thread::scope`]: https://doc.rust-lang.org/std/thread/fn.scope.html
313 ///
314 /// # Examples
315 ///
316 /// A scoped pool may be useful in combination with scoped thread-local variables.
317 ///
318 /// ```
319 /// # use rustc_thread_pool as rayon;
320 ///
321 /// scoped_tls::scoped_thread_local!(static POOL_DATA: Vec<i32>);
322 ///
323 /// fn main() -> Result<(), rayon::ThreadPoolBuildError> {
324 /// let pool_data = vec![1, 2, 3];
325 ///
326 /// // We haven't assigned any TLS data yet.
327 /// assert!(!POOL_DATA.is_set());
328 ///
329 /// rayon::ThreadPoolBuilder::new()
330 /// .build_scoped(
331 /// // Borrow `pool_data` in TLS for each thread.
332 /// |thread| POOL_DATA.set(&pool_data, || thread.run()),
333 /// // Do some work that needs the TLS data.
334 /// |pool| pool.install(|| assert!(POOL_DATA.is_set())),
335 /// )?;
336 ///
337 /// // Once we've returned, `pool_data` is no longer borrowed.
338 /// drop(pool_data);
339 /// Ok(())
340 /// }
341 /// ```
342 pub fn build_scoped<W, F, R>(self, wrapper: W, with_pool: F) -> Result<R, ThreadPoolBuildError>
343 where
344 W: Fn(ThreadBuilder) + Sync, // expected to call `run()`
345 F: FnOnce(&ThreadPool) -> R,
346 {
347 std::thread::scope(|scope| {
348 let pool = self
349 .spawn_handler(|thread| {
350 let mut builder = std::thread::Builder::new();
351 if let Some(name) = thread.name() {
352 builder = builder.name(name.to_string());
353 }
354 if let Some(size) = thread.stack_size() {
355 builder = builder.stack_size(size);
356 }
357 builder.spawn_scoped(scope, || wrapper(thread))?;
358 Ok(())
359 })
360 .build()?;
361 let result = unwind::halt_unwinding(|| with_pool(&pool));
362 pool.wait_until_stopped();
363 match result {
364 Ok(result) => Ok(result),
365 Err(err) => unwind::resume_unwinding(err),
366 }
367 })
368 }
369}
370
371impl<S> ThreadPoolBuilder<S> {
372 /// Sets a custom function for spawning threads.
373 ///
374 /// Note that the threads will not exit until after the pool is dropped. It
375 /// is up to the caller to wait for thread termination if that is important
376 /// for any invariants. For instance, threads created in [`std::thread::scope`]
377 /// will be joined before that scope returns, and this will block indefinitely
378 /// if the pool is leaked. Furthermore, the global thread pool doesn't terminate
379 /// until the entire process exits!
380 ///
381 /// # Examples
382 ///
383 /// A minimal spawn handler just needs to call `run()` from an independent thread.
384 ///
385 /// ```
386 /// # use rustc_thread_pool as rayon;
387 /// fn main() -> Result<(), rayon::ThreadPoolBuildError> {
388 /// let pool = rayon::ThreadPoolBuilder::new()
389 /// .spawn_handler(|thread| {
390 /// std::thread::spawn(|| thread.run());
391 /// Ok(())
392 /// })
393 /// .build()?;
394 ///
395 /// pool.install(|| println!("Hello from my custom thread!"));
396 /// Ok(())
397 /// }
398 /// ```
399 ///
400 /// The default spawn handler sets the name and stack size if given, and propagates
401 /// any errors from the thread builder.
402 ///
403 /// ```
404 /// # use rustc_thread_pool as rayon;
405 /// fn main() -> Result<(), rayon::ThreadPoolBuildError> {
406 /// let pool = rayon::ThreadPoolBuilder::new()
407 /// .spawn_handler(|thread| {
408 /// let mut b = std::thread::Builder::new();
409 /// if let Some(name) = thread.name() {
410 /// b = b.name(name.to_owned());
411 /// }
412 /// if let Some(stack_size) = thread.stack_size() {
413 /// b = b.stack_size(stack_size);
414 /// }
415 /// b.spawn(|| thread.run())?;
416 /// Ok(())
417 /// })
418 /// .build()?;
419 ///
420 /// pool.install(|| println!("Hello from my fully custom thread!"));
421 /// Ok(())
422 /// }
423 /// ```
424 ///
425 /// This can also be used for a pool of scoped threads like [`crossbeam::scope`],
426 /// or [`std::thread::scope`] introduced in Rust 1.63, which is encapsulated in
427 /// [`build_scoped`](#method.build_scoped).
428 ///
429 /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.8/crossbeam/fn.scope.html
430 /// [`std::thread::scope`]: https://doc.rust-lang.org/std/thread/fn.scope.html
431 ///
432 /// ```
433 /// # use rustc_thread_pool as rayon;
434 /// fn main() -> Result<(), rayon::ThreadPoolBuildError> {
435 /// std::thread::scope(|scope| {
436 /// let pool = rayon::ThreadPoolBuilder::new()
437 /// .spawn_handler(|thread| {
438 /// let mut builder = std::thread::Builder::new();
439 /// if let Some(name) = thread.name() {
440 /// builder = builder.name(name.to_string());
441 /// }
442 /// if let Some(size) = thread.stack_size() {
443 /// builder = builder.stack_size(size);
444 /// }
445 /// builder.spawn_scoped(scope, || {
446 /// // Add any scoped initialization here, then run!
447 /// thread.run()
448 /// })?;
449 /// Ok(())
450 /// })
451 /// .build()?;
452 ///
453 /// pool.install(|| println!("Hello from my custom scoped thread!"));
454 /// Ok(())
455 /// })
456 /// }
457 /// ```
458 pub fn spawn_handler<F>(self, spawn: F) -> ThreadPoolBuilder<CustomSpawn<F>>
459 where
460 F: FnMut(ThreadBuilder) -> io::Result<()>,
461 {
462 ThreadPoolBuilder {
463 spawn_handler: CustomSpawn::new(spawn),
464 // ..self
465 num_threads: self.num_threads,
466 panic_handler: self.panic_handler,
467 get_thread_name: self.get_thread_name,
468 stack_size: self.stack_size,
469 start_handler: self.start_handler,
470 exit_handler: self.exit_handler,
471 deadlock_handler: self.deadlock_handler,
472 acquire_thread_handler: self.acquire_thread_handler,
473 release_thread_handler: self.release_thread_handler,
474 breadth_first: self.breadth_first,
475 }
476 }
477
478 /// Returns a reference to the current spawn handler.
479 fn get_spawn_handler(&mut self) -> &mut S {
480 &mut self.spawn_handler
481 }
482
483 /// Get the number of threads that will be used for the thread
484 /// pool. See `num_threads()` for more information.
485 fn get_num_threads(&self) -> usize {
486 if self.num_threads > 0 {
487 self.num_threads
488 } else {
489 let default = || thread::available_parallelism().map(|n| n.get()).unwrap_or(1);
490
491 match env::var("RAYON_NUM_THREADS").ok().and_then(|s| usize::from_str(&s).ok()) {
492 Some(x @ 1..) => return x,
493 Some(0) => return default(),
494 _ => {}
495 }
496
497 // Support for deprecated `RAYON_RS_NUM_CPUS`.
498 match env::var("RAYON_RS_NUM_CPUS").ok().and_then(|s| usize::from_str(&s).ok()) {
499 Some(x @ 1..) => x,
500 _ => default(),
501 }
502 }
503 }
504
505 /// Get the thread name for the thread with the given index.
506 fn get_thread_name(&mut self, index: usize) -> Option<String> {
507 let f = self.get_thread_name.as_mut()?;
508 Some(f(index))
509 }
510
511 /// Sets a closure which takes a thread index and returns
512 /// the thread's name.
513 pub fn thread_name<F>(mut self, closure: F) -> Self
514 where
515 F: FnMut(usize) -> String + 'static,
516 {
517 self.get_thread_name = Some(Box::new(closure));
518 self
519 }
520
521 /// Sets the number of threads to be used in the rayon threadpool.
522 ///
523 /// If you specify a non-zero number of threads using this
524 /// function, then the resulting thread-pools are guaranteed to
525 /// start at most this number of threads.
526 ///
527 /// If `num_threads` is 0, or you do not call this function, then
528 /// the Rayon runtime will select the number of threads
529 /// automatically. At present, this is based on the
530 /// `RAYON_NUM_THREADS` environment variable (if set),
531 /// or the number of logical CPUs (otherwise).
532 /// In the future, however, the default behavior may
533 /// change to dynamically add or remove threads as needed.
534 ///
535 /// **Future compatibility warning:** Given the default behavior
536 /// may change in the future, if you wish to rely on a fixed
537 /// number of threads, you should use this function to specify
538 /// that number. To reproduce the current default behavior, you
539 /// may wish to use [`std::thread::available_parallelism`]
540 /// to query the number of CPUs dynamically.
541 ///
542 /// **Old environment variable:** `RAYON_NUM_THREADS` is a one-to-one
543 /// replacement of the now deprecated `RAYON_RS_NUM_CPUS` environment
544 /// variable. If both variables are specified, `RAYON_NUM_THREADS` will
545 /// be preferred.
546 pub fn num_threads(mut self, num_threads: usize) -> Self {
547 self.num_threads = num_threads;
548 self
549 }
550
551 /// Returns a copy of the current panic handler.
552 fn take_panic_handler(&mut self) -> Option<Box<PanicHandler>> {
553 self.panic_handler.take()
554 }
555
556 /// Normally, whenever Rayon catches a panic, it tries to
557 /// propagate it to someplace sensible, to try and reflect the
558 /// semantics of sequential execution. But in some cases,
559 /// particularly with the `spawn()` APIs, there is no
560 /// obvious place where we should propagate the panic to.
561 /// In that case, this panic handler is invoked.
562 ///
563 /// If no panic handler is set, the default is to abort the
564 /// process, under the principle that panics should not go
565 /// unobserved.
566 ///
567 /// If the panic handler itself panics, this will abort the
568 /// process. To prevent this, wrap the body of your panic handler
569 /// in a call to `std::panic::catch_unwind()`.
570 pub fn panic_handler<H>(mut self, panic_handler: H) -> Self
571 where
572 H: Fn(Box<dyn Any + Send>) + Send + Sync + 'static,
573 {
574 self.panic_handler = Some(Box::new(panic_handler));
575 self
576 }
577
578 /// Get the stack size of the worker threads
579 fn get_stack_size(&self) -> Option<usize> {
580 self.stack_size
581 }
582
583 /// Sets the stack size of the worker threads
584 pub fn stack_size(mut self, stack_size: usize) -> Self {
585 self.stack_size = Some(stack_size);
586 self
587 }
588
589 /// **(DEPRECATED)** Suggest to worker threads that they execute
590 /// spawned jobs in a "breadth-first" fashion.
591 ///
592 /// Typically, when a worker thread is idle or blocked, it will
593 /// attempt to execute the job from the *top* of its local deque of
594 /// work (i.e., the job most recently spawned). If this flag is set
595 /// to true, however, workers will prefer to execute in a
596 /// *breadth-first* fashion -- that is, they will search for jobs at
597 /// the *bottom* of their local deque. (At present, workers *always*
598 /// steal from the bottom of other workers' deques, regardless of
599 /// the setting of this flag.)
600 ///
601 /// If you think of the tasks as a tree, where a parent task
602 /// spawns its children in the tree, then this flag loosely
603 /// corresponds to doing a breadth-first traversal of the tree,
604 /// whereas the default would be to do a depth-first traversal.
605 ///
606 /// **Note that this is an "execution hint".** Rayon's task
607 /// execution is highly dynamic and the precise order in which
608 /// independent tasks are executed is not intended to be
609 /// guaranteed.
610 ///
611 /// This `breadth_first()` method is now deprecated per [RFC #1],
612 /// and in the future its effect may be removed. Consider using
613 /// [`scope_fifo()`] for a similar effect.
614 ///
615 /// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md
616 /// [`scope_fifo()`]: fn.scope_fifo.html
617 #[deprecated(note = "use `scope_fifo` and `spawn_fifo` for similar effect")]
618 pub fn breadth_first(mut self) -> Self {
619 self.breadth_first = true;
620 self
621 }
622
623 fn get_breadth_first(&self) -> bool {
624 self.breadth_first
625 }
626
627 /// Takes the current acquire thread callback, leaving `None`.
628 fn take_acquire_thread_handler(&mut self) -> Option<Box<AcquireThreadHandler>> {
629 self.acquire_thread_handler.take()
630 }
631
632 /// Set a callback to be invoked when starting computations in a thread.
633 pub fn acquire_thread_handler<H>(mut self, acquire_thread_handler: H) -> Self
634 where
635 H: Fn() + Send + Sync + 'static,
636 {
637 self.acquire_thread_handler = Some(Box::new(acquire_thread_handler));
638 self
639 }
640
641 /// Takes the current release thread callback, leaving `None`.
642 fn take_release_thread_handler(&mut self) -> Option<Box<ReleaseThreadHandler>> {
643 self.release_thread_handler.take()
644 }
645
646 /// Set a callback to be invoked when blocking in thread.
647 pub fn release_thread_handler<H>(mut self, release_thread_handler: H) -> Self
648 where
649 H: Fn() + Send + Sync + 'static,
650 {
651 self.release_thread_handler = Some(Box::new(release_thread_handler));
652 self
653 }
654
655 /// Takes the current deadlock callback, leaving `None`.
656 fn take_deadlock_handler(&mut self) -> Option<Box<DeadlockHandler>> {
657 self.deadlock_handler.take()
658 }
659
660 /// Set a callback to be invoked on current deadlock.
661 pub fn deadlock_handler<H>(mut self, deadlock_handler: H) -> Self
662 where
663 H: Fn() + Send + Sync + 'static,
664 {
665 self.deadlock_handler = Some(Box::new(deadlock_handler));
666 self
667 }
668
669 /// Takes the current thread start callback, leaving `None`.
670 fn take_start_handler(&mut self) -> Option<Box<StartHandler>> {
671 self.start_handler.take()
672 }
673
674 /// Sets a callback to be invoked on thread start.
675 ///
676 /// The closure is passed the index of the thread on which it is invoked.
677 /// Note that this same closure may be invoked multiple times in parallel.
678 /// If this closure panics, the panic will be passed to the panic handler.
679 /// If that handler returns, then startup will continue normally.
680 pub fn start_handler<H>(mut self, start_handler: H) -> Self
681 where
682 H: Fn(usize) + Send + Sync + 'static,
683 {
684 self.start_handler = Some(Box::new(start_handler));
685 self
686 }
687
688 /// Returns a current thread exit callback, leaving `None`.
689 fn take_exit_handler(&mut self) -> Option<Box<ExitHandler>> {
690 self.exit_handler.take()
691 }
692
693 /// Sets a callback to be invoked on thread exit.
694 ///
695 /// The closure is passed the index of the thread on which it is invoked.
696 /// Note that this same closure may be invoked multiple times in parallel.
697 /// If this closure panics, the panic will be passed to the panic handler.
698 /// If that handler returns, then the thread will exit normally.
699 pub fn exit_handler<H>(mut self, exit_handler: H) -> Self
700 where
701 H: Fn(usize) + Send + Sync + 'static,
702 {
703 self.exit_handler = Some(Box::new(exit_handler));
704 self
705 }
706}
707
708#[allow(deprecated)]
709impl Configuration {
710 /// Creates and return a valid rayon thread pool configuration, but does not initialize it.
711 pub fn new() -> Configuration {
712 Configuration { builder: ThreadPoolBuilder::new() }
713 }
714
715 /// Deprecated in favor of `ThreadPoolBuilder::build`.
716 pub fn build(self) -> Result<ThreadPool, Box<dyn Error + 'static>> {
717 self.builder.build().map_err(Box::from)
718 }
719
720 /// Deprecated in favor of `ThreadPoolBuilder::thread_name`.
721 pub fn thread_name<F>(mut self, closure: F) -> Self
722 where
723 F: FnMut(usize) -> String + 'static,
724 {
725 self.builder = self.builder.thread_name(closure);
726 self
727 }
728
729 /// Deprecated in favor of `ThreadPoolBuilder::num_threads`.
730 pub fn num_threads(mut self, num_threads: usize) -> Configuration {
731 self.builder = self.builder.num_threads(num_threads);
732 self
733 }
734
735 /// Deprecated in favor of `ThreadPoolBuilder::panic_handler`.
736 pub fn panic_handler<H>(mut self, panic_handler: H) -> Configuration
737 where
738 H: Fn(Box<dyn Any + Send>) + Send + Sync + 'static,
739 {
740 self.builder = self.builder.panic_handler(panic_handler);
741 self
742 }
743
744 /// Deprecated in favor of `ThreadPoolBuilder::stack_size`.
745 pub fn stack_size(mut self, stack_size: usize) -> Self {
746 self.builder = self.builder.stack_size(stack_size);
747 self
748 }
749
750 /// Deprecated in favor of `ThreadPoolBuilder::breadth_first`.
751 pub fn breadth_first(mut self) -> Self {
752 self.builder = self.builder.breadth_first();
753 self
754 }
755
756 /// Deprecated in favor of `ThreadPoolBuilder::start_handler`.
757 pub fn start_handler<H>(mut self, start_handler: H) -> Configuration
758 where
759 H: Fn(usize) + Send + Sync + 'static,
760 {
761 self.builder = self.builder.start_handler(start_handler);
762 self
763 }
764
765 /// Deprecated in favor of `ThreadPoolBuilder::exit_handler`.
766 pub fn exit_handler<H>(mut self, exit_handler: H) -> Configuration
767 where
768 H: Fn(usize) + Send + Sync + 'static,
769 {
770 self.builder = self.builder.exit_handler(exit_handler);
771 self
772 }
773
774 /// Returns a ThreadPoolBuilder with identical parameters.
775 fn into_builder(self) -> ThreadPoolBuilder {
776 self.builder
777 }
778}
779
780impl ThreadPoolBuildError {
781 fn new(kind: ErrorKind) -> ThreadPoolBuildError {
782 ThreadPoolBuildError { kind }
783 }
784
785 fn is_unsupported(&self) -> bool {
786 matches!(&self.kind, ErrorKind::IOError(e) if e.kind() == io::ErrorKind::Unsupported)
787 }
788}
789
790const GLOBAL_POOL_ALREADY_INITIALIZED: &str =
791 "The global thread pool has already been initialized.";
792
793impl Error for ThreadPoolBuildError {
794 #[allow(deprecated)]
795 fn description(&self) -> &str {
796 match self.kind {
797 ErrorKind::GlobalPoolAlreadyInitialized => GLOBAL_POOL_ALREADY_INITIALIZED,
798 ErrorKind::IOError(ref e) => e.description(),
799 }
800 }
801
802 fn source(&self) -> Option<&(dyn Error + 'static)> {
803 match &self.kind {
804 ErrorKind::GlobalPoolAlreadyInitialized => None,
805 ErrorKind::IOError(e) => Some(e),
806 }
807 }
808}
809
810impl fmt::Display for ThreadPoolBuildError {
811 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
812 match &self.kind {
813 ErrorKind::GlobalPoolAlreadyInitialized => GLOBAL_POOL_ALREADY_INITIALIZED.fmt(f),
814 ErrorKind::IOError(e) => e.fmt(f),
815 }
816 }
817}
818
819/// Deprecated in favor of `ThreadPoolBuilder::build_global`.
820#[deprecated(note = "use `ThreadPoolBuilder::build_global`")]
821#[allow(deprecated)]
822pub fn initialize(config: Configuration) -> Result<(), Box<dyn Error>> {
823 config.into_builder().build_global().map_err(Box::from)
824}
825
826impl<S> fmt::Debug for ThreadPoolBuilder<S> {
827 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
828 let ThreadPoolBuilder {
829 ref num_threads,
830 ref get_thread_name,
831 ref panic_handler,
832 ref stack_size,
833 ref deadlock_handler,
834 ref start_handler,
835 ref exit_handler,
836 ref acquire_thread_handler,
837 ref release_thread_handler,
838 spawn_handler: _,
839 ref breadth_first,
840 } = *self;
841
842 // Just print `Some(<closure>)` or `None` to the debug
843 // output.
844 struct ClosurePlaceholder;
845 impl fmt::Debug for ClosurePlaceholder {
846 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
847 f.write_str("<closure>")
848 }
849 }
850 let get_thread_name = get_thread_name.as_ref().map(|_| ClosurePlaceholder);
851 let panic_handler = panic_handler.as_ref().map(|_| ClosurePlaceholder);
852 let deadlock_handler = deadlock_handler.as_ref().map(|_| ClosurePlaceholder);
853 let start_handler = start_handler.as_ref().map(|_| ClosurePlaceholder);
854 let exit_handler = exit_handler.as_ref().map(|_| ClosurePlaceholder);
855 let acquire_thread_handler = acquire_thread_handler.as_ref().map(|_| ClosurePlaceholder);
856 let release_thread_handler = release_thread_handler.as_ref().map(|_| ClosurePlaceholder);
857
858 f.debug_struct("ThreadPoolBuilder")
859 .field("num_threads", num_threads)
860 .field("get_thread_name", &get_thread_name)
861 .field("panic_handler", &panic_handler)
862 .field("stack_size", &stack_size)
863 .field("deadlock_handler", &deadlock_handler)
864 .field("start_handler", &start_handler)
865 .field("exit_handler", &exit_handler)
866 .field("acquire_thread_handler", &acquire_thread_handler)
867 .field("release_thread_handler", &release_thread_handler)
868 .field("breadth_first", &breadth_first)
869 .finish()
870 }
871}
872
873#[allow(deprecated)]
874impl fmt::Debug for Configuration {
875 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
876 self.builder.fmt(f)
877 }
878}
879
880/// Provides the calling context to a closure called by `join_context`.
881#[derive(Debug)]
882pub struct FnContext {
883 migrated: bool,
884
885 /// disable `Send` and `Sync`, just for a little future-proofing.
886 _marker: PhantomData<*mut ()>,
887}
888
889impl FnContext {
890 #[inline]
891 fn new(migrated: bool) -> Self {
892 FnContext { migrated, _marker: PhantomData }
893 }
894}
895
896impl FnContext {
897 /// Returns `true` if the closure was called from a different thread
898 /// than it was provided from.
899 #[inline]
900 pub fn migrated(&self) -> bool {
901 self.migrated
902 }
903}