rustc_thread_pool/sleep/
mod.rs

1//! Code that decides when workers should go to sleep. See README.md
2//! for an overview.
3
4use std::sync::atomic::Ordering;
5use std::sync::{Condvar, Mutex};
6use std::thread;
7
8use crossbeam_utils::CachePadded;
9
10use crate::DeadlockHandler;
11use crate::latch::CoreLatch;
12use crate::registry::WorkerThread;
13
14mod counters;
15pub(crate) use self::counters::THREADS_MAX;
16use self::counters::{AtomicCounters, JobsEventCounter};
17
18struct SleepData {
19    /// The number of threads in the thread pool.
20    worker_count: usize,
21
22    /// The number of threads in the thread pool which are running and
23    /// aren't blocked in user code or sleeping.
24    active_threads: usize,
25
26    /// The number of threads which are blocked in user code.
27    /// This doesn't include threads blocked by this module.
28    blocked_threads: usize,
29}
30
31impl SleepData {
32    /// Checks if the conditions for a deadlock holds and if so calls the deadlock handler
33    #[inline]
34    pub(super) fn deadlock_check(&self, deadlock_handler: &Option<Box<DeadlockHandler>>) {
35        if self.active_threads == 0 && self.blocked_threads > 0 {
36            (deadlock_handler.as_ref().unwrap())();
37        }
38    }
39}
40
41/// The `Sleep` struct is embedded into each registry. It governs the waking and sleeping
42/// of workers. It has callbacks that are invoked periodically at significant events,
43/// such as when workers are looping and looking for work, when latches are set, or when
44/// jobs are published, and it either blocks threads or wakes them in response to these
45/// events. See the [`README.md`] in this module for more details.
46///
47/// [`README.md`] README.md
48pub(super) struct Sleep {
49    /// One "sleep state" per worker. Used to track if a worker is sleeping and to have
50    /// them block.
51    worker_sleep_states: Vec<CachePadded<WorkerSleepState>>,
52
53    counters: AtomicCounters,
54
55    data: Mutex<SleepData>,
56}
57
58/// An instance of this struct is created when a thread becomes idle.
59/// It is consumed when the thread finds work, and passed by `&mut`
60/// reference for operations that preserve the idle state. (In other
61/// words, producing one of these structs is evidence the thread is
62/// idle.) It tracks state such as how long the thread has been idle.
63pub(super) struct IdleState {
64    /// What is worker index of the idle thread?
65    worker_index: usize,
66
67    /// How many rounds have we been circling without sleeping?
68    rounds: u32,
69
70    /// Once we become sleepy, what was the sleepy counter value?
71    /// Set to `INVALID_SLEEPY_COUNTER` otherwise.
72    jobs_counter: JobsEventCounter,
73}
74
75/// The "sleep state" for an individual worker.
76#[derive(Default)]
77struct WorkerSleepState {
78    /// Set to true when the worker goes to sleep; set to false when
79    /// the worker is notified or when it wakes.
80    is_blocked: Mutex<bool>,
81
82    condvar: Condvar,
83}
84
85const ROUNDS_UNTIL_SLEEPY: u32 = 32;
86const ROUNDS_UNTIL_SLEEPING: u32 = ROUNDS_UNTIL_SLEEPY + 1;
87
88impl Sleep {
89    pub(super) fn new(n_threads: usize) -> Sleep {
90        assert!(n_threads <= THREADS_MAX);
91        Sleep {
92            worker_sleep_states: (0..n_threads).map(|_| Default::default()).collect(),
93            counters: AtomicCounters::new(),
94            data: Mutex::new(SleepData {
95                worker_count: n_threads,
96                active_threads: n_threads,
97                blocked_threads: 0,
98            }),
99        }
100    }
101
102    /// Mark a Rayon worker thread as blocked. This triggers the deadlock handler
103    /// if no other worker thread is active
104    #[inline]
105    pub(super) fn mark_blocked(&self, deadlock_handler: &Option<Box<DeadlockHandler>>) {
106        let mut data = self.data.lock().unwrap();
107        debug_assert!(data.active_threads > 0);
108        debug_assert!(data.blocked_threads < data.worker_count);
109        debug_assert!(data.active_threads > 0);
110        data.active_threads -= 1;
111        data.blocked_threads += 1;
112
113        data.deadlock_check(deadlock_handler);
114    }
115
116    /// Mark a previously blocked Rayon worker thread as unblocked
117    #[inline]
118    pub(super) fn mark_unblocked(&self) {
119        let mut data = self.data.lock().unwrap();
120        debug_assert!(data.active_threads < data.worker_count);
121        debug_assert!(data.blocked_threads > 0);
122        data.active_threads += 1;
123        data.blocked_threads -= 1;
124    }
125
126    #[inline]
127    pub(super) fn start_looking(&self, worker_index: usize) -> IdleState {
128        self.counters.add_inactive_thread();
129
130        IdleState { worker_index, rounds: 0, jobs_counter: JobsEventCounter::DUMMY }
131    }
132
133    #[inline]
134    pub(super) fn work_found(&self) {
135        // If we were the last idle thread and other threads are still sleeping,
136        // then we should wake up another thread.
137        let threads_to_wake = self.counters.sub_inactive_thread();
138        self.wake_any_threads(threads_to_wake as u32);
139    }
140
141    #[inline]
142    pub(super) fn no_work_found(
143        &self,
144        idle_state: &mut IdleState,
145        latch: &CoreLatch,
146        thread: &WorkerThread,
147        steal: bool,
148    ) {
149        if idle_state.rounds < ROUNDS_UNTIL_SLEEPY {
150            thread::yield_now();
151            idle_state.rounds += 1;
152        } else if idle_state.rounds == ROUNDS_UNTIL_SLEEPY {
153            idle_state.jobs_counter = self.announce_sleepy();
154            idle_state.rounds += 1;
155            thread::yield_now();
156        } else if idle_state.rounds < ROUNDS_UNTIL_SLEEPING {
157            idle_state.rounds += 1;
158            thread::yield_now();
159        } else {
160            debug_assert_eq!(idle_state.rounds, ROUNDS_UNTIL_SLEEPING);
161            self.sleep(idle_state, latch, thread, steal);
162        }
163    }
164
165    #[cold]
166    fn announce_sleepy(&self) -> JobsEventCounter {
167        self.counters.increment_jobs_event_counter_if(JobsEventCounter::is_active).jobs_counter()
168    }
169
170    #[cold]
171    fn sleep(
172        &self,
173        idle_state: &mut IdleState,
174        latch: &CoreLatch,
175        thread: &WorkerThread,
176        steal: bool,
177    ) {
178        let worker_index = idle_state.worker_index;
179
180        if !latch.get_sleepy() {
181            return;
182        }
183
184        let sleep_state = &self.worker_sleep_states[worker_index];
185        let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
186        debug_assert!(!*is_blocked);
187
188        // Our latch was signalled. We should wake back up fully as we
189        // will have some stuff to do.
190        if !latch.fall_asleep() {
191            idle_state.wake_fully();
192            return;
193        }
194
195        loop {
196            let counters = self.counters.load(Ordering::SeqCst);
197
198            // Check if the JEC has changed since we got sleepy.
199            debug_assert!(idle_state.jobs_counter.is_sleepy());
200            if counters.jobs_counter() != idle_state.jobs_counter {
201                // JEC has changed, so a new job was posted, but for some reason
202                // we didn't see it. We should return to just before the SLEEPY
203                // state so we can do another search and (if we fail to find
204                // work) go back to sleep.
205                idle_state.wake_partly();
206                latch.wake_up();
207                return;
208            }
209
210            // Otherwise, let's move from IDLE to SLEEPING.
211            if self.counters.try_add_sleeping_thread(counters) {
212                break;
213            }
214        }
215
216        // Successfully registered as asleep.
217
218        // We have one last check for injected jobs to do. This protects against
219        // deadlock in the very unlikely event that
220        //
221        // - an external job is being injected while we are sleepy
222        // - that job triggers the rollover over the JEC such that we don't see it
223        // - we are the last active worker thread
224        std::sync::atomic::fence(Ordering::SeqCst);
225        if steal && thread.has_injected_job() {
226            // If we see an externally injected job, then we have to 'wake
227            // ourselves up'. (Ordinarily, `sub_sleeping_thread` is invoked by
228            // the one that wakes us.)
229            self.counters.sub_sleeping_thread();
230        } else {
231            {
232                // Decrement the number of active threads and check for a deadlock
233                let mut data = self.data.lock().unwrap();
234                data.active_threads -= 1;
235                data.deadlock_check(&thread.registry.deadlock_handler);
236            }
237
238            // If we don't see an injected job (the normal case), then flag
239            // ourselves as asleep and wait till we are notified.
240            //
241            // (Note that `is_blocked` is held under a mutex and the mutex was
242            // acquired *before* we incremented the "sleepy counter". This means
243            // that whomever is coming to wake us will have to wait until we
244            // release the mutex in the call to `wait`, so they will see this
245            // boolean as true.)
246            thread.registry.release_thread();
247            *is_blocked = true;
248            while *is_blocked {
249                is_blocked = sleep_state.condvar.wait(is_blocked).unwrap();
250            }
251
252            // Drop `is_blocked` now in case `acquire_thread` blocks
253            drop(is_blocked);
254
255            thread.registry.acquire_thread();
256        }
257
258        // Update other state:
259        idle_state.wake_fully();
260        latch.wake_up();
261    }
262
263    /// Notify the given thread that it should wake up (if it is
264    /// sleeping). When this method is invoked, we typically know the
265    /// thread is asleep, though in rare cases it could have been
266    /// awoken by (e.g.) new work having been posted.
267    pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) {
268        self.wake_specific_thread(target_worker_index);
269    }
270
271    /// Signals that `num_jobs` new jobs were injected into the thread
272    /// pool from outside. This function will ensure that there are
273    /// threads available to process them, waking threads from sleep
274    /// if necessary.
275    ///
276    /// # Parameters
277    ///
278    /// - `num_jobs` -- lower bound on number of jobs available for stealing.
279    ///   We'll try to get at least one thread per job.
280    #[inline]
281    pub(super) fn new_injected_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
282        // This fence is needed to guarantee that threads
283        // as they are about to fall asleep, observe any
284        // new jobs that may have been injected.
285        std::sync::atomic::fence(Ordering::SeqCst);
286
287        self.new_jobs(num_jobs, queue_was_empty)
288    }
289
290    /// Signals that `num_jobs` new jobs were pushed onto a thread's
291    /// local deque. This function will try to ensure that there are
292    /// threads available to process them, waking threads from sleep
293    /// if necessary. However, this is not guaranteed: under certain
294    /// race conditions, the function may fail to wake any new
295    /// threads; in that case the existing thread should eventually
296    /// pop the job.
297    ///
298    /// # Parameters
299    ///
300    /// - `num_jobs` -- lower bound on number of jobs available for stealing.
301    ///   We'll try to get at least one thread per job.
302    #[inline]
303    pub(super) fn new_internal_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
304        self.new_jobs(num_jobs, queue_was_empty)
305    }
306
307    /// Common helper for `new_injected_jobs` and `new_internal_jobs`.
308    #[inline]
309    fn new_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
310        // Read the counters and -- if sleepy workers have announced themselves
311        // -- announce that there is now work available. The final value of `counters`
312        // with which we exit the loop thus corresponds to a state when
313        let counters = self.counters.increment_jobs_event_counter_if(JobsEventCounter::is_sleepy);
314        let num_awake_but_idle = counters.awake_but_idle_threads();
315        let num_sleepers = counters.sleeping_threads();
316
317        if num_sleepers == 0 {
318            // nobody to wake
319            return;
320        }
321
322        // Promote from u16 to u32 so we can interoperate with
323        // num_jobs more easily.
324        let num_awake_but_idle = num_awake_but_idle as u32;
325        let num_sleepers = num_sleepers as u32;
326
327        // If the queue is non-empty, then we always wake up a worker
328        // -- clearly the existing idle jobs aren't enough. Otherwise,
329        // check to see if we have enough idle workers.
330        if !queue_was_empty {
331            let num_to_wake = Ord::min(num_jobs, num_sleepers);
332            self.wake_any_threads(num_to_wake);
333        } else if num_awake_but_idle < num_jobs {
334            let num_to_wake = Ord::min(num_jobs - num_awake_but_idle, num_sleepers);
335            self.wake_any_threads(num_to_wake);
336        }
337    }
338
339    #[cold]
340    fn wake_any_threads(&self, mut num_to_wake: u32) {
341        if num_to_wake > 0 {
342            for i in 0..self.worker_sleep_states.len() {
343                if self.wake_specific_thread(i) {
344                    num_to_wake -= 1;
345                    if num_to_wake == 0 {
346                        return;
347                    }
348                }
349            }
350        }
351    }
352
353    fn wake_specific_thread(&self, index: usize) -> bool {
354        let sleep_state = &self.worker_sleep_states[index];
355
356        let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
357        if *is_blocked {
358            *is_blocked = false;
359
360            // Increment the number of active threads
361            self.data.lock().unwrap().active_threads += 1;
362
363            sleep_state.condvar.notify_one();
364
365            // When the thread went to sleep, it will have incremented
366            // this value. When we wake it, its our job to decrement
367            // it. We could have the thread do it, but that would
368            // introduce a delay between when the thread was
369            // *notified* and when this counter was decremented. That
370            // might mislead people with new work into thinking that
371            // there are sleeping threads that they should try to
372            // wake, when in fact there is nothing left for them to
373            // do.
374            self.counters.sub_sleeping_thread();
375
376            true
377        } else {
378            false
379        }
380    }
381}
382
383impl IdleState {
384    fn wake_fully(&mut self) {
385        self.rounds = 0;
386        self.jobs_counter = JobsEventCounter::DUMMY;
387    }
388
389    fn wake_partly(&mut self) {
390        self.rounds = ROUNDS_UNTIL_SLEEPY;
391        self.jobs_counter = JobsEventCounter::DUMMY;
392    }
393}