rustc_thread_pool/sleep/
mod.rs

1//! Code that decides when workers should go to sleep. See README.md
2//! for an overview.
3
4use std::sync::atomic::Ordering;
5use std::sync::{Condvar, Mutex};
6use std::thread;
7
8use crossbeam_utils::CachePadded;
9
10use crate::DeadlockHandler;
11use crate::latch::CoreLatch;
12use crate::registry::WorkerThread;
13
14mod counters;
15pub(crate) use self::counters::THREADS_MAX;
16use self::counters::{AtomicCounters, JobsEventCounter};
17
18struct SleepData {
19    /// The number of threads in the thread pool.
20    worker_count: usize,
21
22    /// The number of threads in the thread pool which are running and
23    /// aren't blocked in user code or sleeping.
24    active_threads: usize,
25
26    /// The number of threads which are blocked in user code.
27    /// This doesn't include threads blocked by this module.
28    blocked_threads: usize,
29}
30
31impl SleepData {
32    /// Checks if the conditions for a deadlock holds and if so calls the deadlock handler
33    #[inline]
34    pub(super) fn deadlock_check(&self, deadlock_handler: &Option<Box<DeadlockHandler>>) {
35        if self.active_threads == 0 && self.blocked_threads > 0 {
36            (deadlock_handler.as_ref().unwrap())();
37        }
38    }
39}
40
41/// The `Sleep` struct is embedded into each registry. It governs the waking and sleeping
42/// of workers. It has callbacks that are invoked periodically at significant events,
43/// such as when workers are looping and looking for work, when latches are set, or when
44/// jobs are published, and it either blocks threads or wakes them in response to these
45/// events. See the [`README.md`] in this module for more details.
46///
47/// [`README.md`] README.md
48pub(super) struct Sleep {
49    /// One "sleep state" per worker. Used to track if a worker is sleeping and to have
50    /// them block.
51    worker_sleep_states: Vec<CachePadded<WorkerSleepState>>,
52
53    counters: AtomicCounters,
54
55    data: Mutex<SleepData>,
56}
57
58/// An instance of this struct is created when a thread becomes idle.
59/// It is consumed when the thread finds work, and passed by `&mut`
60/// reference for operations that preserve the idle state. (In other
61/// words, producing one of these structs is evidence the thread is
62/// idle.) It tracks state such as how long the thread has been idle.
63pub(super) struct IdleState {
64    /// What is worker index of the idle thread?
65    worker_index: usize,
66
67    /// How many rounds have we been circling without sleeping?
68    rounds: u32,
69
70    /// Once we become sleepy, what was the sleepy counter value?
71    /// Set to `INVALID_SLEEPY_COUNTER` otherwise.
72    jobs_counter: JobsEventCounter,
73}
74
75/// The "sleep state" for an individual worker.
76#[derive(Default)]
77struct WorkerSleepState {
78    /// Set to true when the worker goes to sleep; set to false when
79    /// the worker is notified or when it wakes.
80    is_blocked: Mutex<bool>,
81
82    condvar: Condvar,
83}
84
85const ROUNDS_UNTIL_SLEEPY: u32 = 32;
86const ROUNDS_UNTIL_SLEEPING: u32 = ROUNDS_UNTIL_SLEEPY + 1;
87
88impl Sleep {
89    pub(super) fn new(n_threads: usize) -> Sleep {
90        assert!(n_threads <= THREADS_MAX);
91        Sleep {
92            worker_sleep_states: (0..n_threads).map(|_| Default::default()).collect(),
93            counters: AtomicCounters::new(),
94            data: Mutex::new(SleepData {
95                worker_count: n_threads,
96                active_threads: n_threads,
97                blocked_threads: 0,
98            }),
99        }
100    }
101
102    /// Mark a Rayon worker thread as blocked. This triggers the deadlock handler
103    /// if no other worker thread is active
104    #[inline]
105    pub(super) fn mark_blocked(&self, deadlock_handler: &Option<Box<DeadlockHandler>>) {
106        let mut data = self.data.lock().unwrap();
107        debug_assert!(data.active_threads > 0);
108        debug_assert!(data.blocked_threads < data.worker_count);
109        debug_assert!(data.active_threads > 0);
110        data.active_threads -= 1;
111        data.blocked_threads += 1;
112
113        data.deadlock_check(deadlock_handler);
114    }
115
116    /// Mark a previously blocked Rayon worker thread as unblocked
117    #[inline]
118    pub(super) fn mark_unblocked(&self) {
119        let mut data = self.data.lock().unwrap();
120        debug_assert!(data.active_threads < data.worker_count);
121        debug_assert!(data.blocked_threads > 0);
122        data.active_threads += 1;
123        data.blocked_threads -= 1;
124    }
125
126    #[inline]
127    pub(super) fn start_looking(&self, worker_index: usize) -> IdleState {
128        self.counters.add_inactive_thread();
129
130        IdleState { worker_index, rounds: 0, jobs_counter: JobsEventCounter::DUMMY }
131    }
132
133    #[inline]
134    pub(super) fn work_found(&self) {
135        // If we were the last idle thread and other threads are still sleeping,
136        // then we should wake up another thread.
137        let threads_to_wake = self.counters.sub_inactive_thread();
138        self.wake_any_threads(threads_to_wake as u32);
139    }
140
141    #[inline]
142    pub(super) fn no_work_found(
143        &self,
144        idle_state: &mut IdleState,
145        latch: &CoreLatch,
146        thread: &WorkerThread,
147    ) {
148        if idle_state.rounds < ROUNDS_UNTIL_SLEEPY {
149            thread::yield_now();
150            idle_state.rounds += 1;
151        } else if idle_state.rounds == ROUNDS_UNTIL_SLEEPY {
152            idle_state.jobs_counter = self.announce_sleepy();
153            idle_state.rounds += 1;
154            thread::yield_now();
155        } else if idle_state.rounds < ROUNDS_UNTIL_SLEEPING {
156            idle_state.rounds += 1;
157            thread::yield_now();
158        } else {
159            debug_assert_eq!(idle_state.rounds, ROUNDS_UNTIL_SLEEPING);
160            self.sleep(idle_state, latch, thread);
161        }
162    }
163
164    #[cold]
165    fn announce_sleepy(&self) -> JobsEventCounter {
166        self.counters.increment_jobs_event_counter_if(JobsEventCounter::is_active).jobs_counter()
167    }
168
169    #[cold]
170    fn sleep(&self, idle_state: &mut IdleState, latch: &CoreLatch, thread: &WorkerThread) {
171        let worker_index = idle_state.worker_index;
172
173        if !latch.get_sleepy() {
174            return;
175        }
176
177        let sleep_state = &self.worker_sleep_states[worker_index];
178        let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
179        debug_assert!(!*is_blocked);
180
181        // Our latch was signalled. We should wake back up fully as we
182        // will have some stuff to do.
183        if !latch.fall_asleep() {
184            idle_state.wake_fully();
185            return;
186        }
187
188        loop {
189            let counters = self.counters.load(Ordering::SeqCst);
190
191            // Check if the JEC has changed since we got sleepy.
192            debug_assert!(idle_state.jobs_counter.is_sleepy());
193            if counters.jobs_counter() != idle_state.jobs_counter {
194                // JEC has changed, so a new job was posted, but for some reason
195                // we didn't see it. We should return to just before the SLEEPY
196                // state so we can do another search and (if we fail to find
197                // work) go back to sleep.
198                idle_state.wake_partly();
199                latch.wake_up();
200                return;
201            }
202
203            // Otherwise, let's move from IDLE to SLEEPING.
204            if self.counters.try_add_sleeping_thread(counters) {
205                break;
206            }
207        }
208
209        // Successfully registered as asleep.
210
211        // We have one last check for injected jobs to do. This protects against
212        // deadlock in the very unlikely event that
213        //
214        // - an external job is being injected while we are sleepy
215        // - that job triggers the rollover over the JEC such that we don't see it
216        // - we are the last active worker thread
217        std::sync::atomic::fence(Ordering::SeqCst);
218        if thread.has_injected_job() {
219            // If we see an externally injected job, then we have to 'wake
220            // ourselves up'. (Ordinarily, `sub_sleeping_thread` is invoked by
221            // the one that wakes us.)
222            self.counters.sub_sleeping_thread();
223        } else {
224            {
225                // Decrement the number of active threads and check for a deadlock
226                let mut data = self.data.lock().unwrap();
227                data.active_threads -= 1;
228                data.deadlock_check(&thread.registry.deadlock_handler);
229            }
230
231            // If we don't see an injected job (the normal case), then flag
232            // ourselves as asleep and wait till we are notified.
233            //
234            // (Note that `is_blocked` is held under a mutex and the mutex was
235            // acquired *before* we incremented the "sleepy counter". This means
236            // that whomever is coming to wake us will have to wait until we
237            // release the mutex in the call to `wait`, so they will see this
238            // boolean as true.)
239            thread.registry.release_thread();
240            *is_blocked = true;
241            while *is_blocked {
242                is_blocked = sleep_state.condvar.wait(is_blocked).unwrap();
243            }
244
245            // Drop `is_blocked` now in case `acquire_thread` blocks
246            drop(is_blocked);
247
248            thread.registry.acquire_thread();
249        }
250
251        // Update other state:
252        idle_state.wake_fully();
253        latch.wake_up();
254    }
255
256    /// Notify the given thread that it should wake up (if it is
257    /// sleeping). When this method is invoked, we typically know the
258    /// thread is asleep, though in rare cases it could have been
259    /// awoken by (e.g.) new work having been posted.
260    pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) {
261        self.wake_specific_thread(target_worker_index);
262    }
263
264    /// Signals that `num_jobs` new jobs were injected into the thread
265    /// pool from outside. This function will ensure that there are
266    /// threads available to process them, waking threads from sleep
267    /// if necessary.
268    ///
269    /// # Parameters
270    ///
271    /// - `num_jobs` -- lower bound on number of jobs available for stealing.
272    ///   We'll try to get at least one thread per job.
273    #[inline]
274    pub(super) fn new_injected_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
275        // This fence is needed to guarantee that threads
276        // as they are about to fall asleep, observe any
277        // new jobs that may have been injected.
278        std::sync::atomic::fence(Ordering::SeqCst);
279
280        self.new_jobs(num_jobs, queue_was_empty)
281    }
282
283    /// Signals that `num_jobs` new jobs were pushed onto a thread's
284    /// local deque. This function will try to ensure that there are
285    /// threads available to process them, waking threads from sleep
286    /// if necessary. However, this is not guaranteed: under certain
287    /// race conditions, the function may fail to wake any new
288    /// threads; in that case the existing thread should eventually
289    /// pop the job.
290    ///
291    /// # Parameters
292    ///
293    /// - `num_jobs` -- lower bound on number of jobs available for stealing.
294    ///   We'll try to get at least one thread per job.
295    #[inline]
296    pub(super) fn new_internal_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
297        self.new_jobs(num_jobs, queue_was_empty)
298    }
299
300    /// Common helper for `new_injected_jobs` and `new_internal_jobs`.
301    #[inline]
302    fn new_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
303        // Read the counters and -- if sleepy workers have announced themselves
304        // -- announce that there is now work available. The final value of `counters`
305        // with which we exit the loop thus corresponds to a state when
306        let counters = self.counters.increment_jobs_event_counter_if(JobsEventCounter::is_sleepy);
307        let num_awake_but_idle = counters.awake_but_idle_threads();
308        let num_sleepers = counters.sleeping_threads();
309
310        if num_sleepers == 0 {
311            // nobody to wake
312            return;
313        }
314
315        // Promote from u16 to u32 so we can interoperate with
316        // num_jobs more easily.
317        let num_awake_but_idle = num_awake_but_idle as u32;
318        let num_sleepers = num_sleepers as u32;
319
320        // If the queue is non-empty, then we always wake up a worker
321        // -- clearly the existing idle jobs aren't enough. Otherwise,
322        // check to see if we have enough idle workers.
323        if !queue_was_empty {
324            let num_to_wake = Ord::min(num_jobs, num_sleepers);
325            self.wake_any_threads(num_to_wake);
326        } else if num_awake_but_idle < num_jobs {
327            let num_to_wake = Ord::min(num_jobs - num_awake_but_idle, num_sleepers);
328            self.wake_any_threads(num_to_wake);
329        }
330    }
331
332    #[cold]
333    fn wake_any_threads(&self, mut num_to_wake: u32) {
334        if num_to_wake > 0 {
335            for i in 0..self.worker_sleep_states.len() {
336                if self.wake_specific_thread(i) {
337                    num_to_wake -= 1;
338                    if num_to_wake == 0 {
339                        return;
340                    }
341                }
342            }
343        }
344    }
345
346    fn wake_specific_thread(&self, index: usize) -> bool {
347        let sleep_state = &self.worker_sleep_states[index];
348
349        let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
350        if *is_blocked {
351            *is_blocked = false;
352
353            // Increment the number of active threads
354            self.data.lock().unwrap().active_threads += 1;
355
356            sleep_state.condvar.notify_one();
357
358            // When the thread went to sleep, it will have incremented
359            // this value. When we wake it, its our job to decrement
360            // it. We could have the thread do it, but that would
361            // introduce a delay between when the thread was
362            // *notified* and when this counter was decremented. That
363            // might mislead people with new work into thinking that
364            // there are sleeping threads that they should try to
365            // wake, when in fact there is nothing left for them to
366            // do.
367            self.counters.sub_sleeping_thread();
368
369            true
370        } else {
371            false
372        }
373    }
374}
375
376impl IdleState {
377    fn wake_fully(&mut self) {
378        self.rounds = 0;
379        self.jobs_counter = JobsEventCounter::DUMMY;
380    }
381
382    fn wake_partly(&mut self) {
383        self.rounds = ROUNDS_UNTIL_SLEEPY;
384        self.jobs_counter = JobsEventCounter::DUMMY;
385    }
386}