rustc_thread_pool/sleep/mod.rs
1//! Code that decides when workers should go to sleep. See README.md
2//! for an overview.
3
4use std::sync::atomic::Ordering;
5use std::sync::{Condvar, Mutex};
6use std::thread;
7
8use crossbeam_utils::CachePadded;
9
10use crate::DeadlockHandler;
11use crate::latch::CoreLatch;
12use crate::registry::WorkerThread;
13
14mod counters;
15pub(crate) use self::counters::THREADS_MAX;
16use self::counters::{AtomicCounters, JobsEventCounter};
17
18struct SleepData {
19 /// The number of threads in the thread pool.
20 worker_count: usize,
21
22 /// The number of threads in the thread pool which are running and
23 /// aren't blocked in user code or sleeping.
24 active_threads: usize,
25
26 /// The number of threads which are blocked in user code.
27 /// This doesn't include threads blocked by this module.
28 blocked_threads: usize,
29}
30
31impl SleepData {
32 /// Checks if the conditions for a deadlock holds and if so calls the deadlock handler
33 #[inline]
34 pub(super) fn deadlock_check(&self, deadlock_handler: &Option<Box<DeadlockHandler>>) {
35 if self.active_threads == 0 && self.blocked_threads > 0 {
36 (deadlock_handler.as_ref().unwrap())();
37 }
38 }
39}
40
41/// The `Sleep` struct is embedded into each registry. It governs the waking and sleeping
42/// of workers. It has callbacks that are invoked periodically at significant events,
43/// such as when workers are looping and looking for work, when latches are set, or when
44/// jobs are published, and it either blocks threads or wakes them in response to these
45/// events. See the [`README.md`] in this module for more details.
46///
47/// [`README.md`] README.md
48pub(super) struct Sleep {
49 /// One "sleep state" per worker. Used to track if a worker is sleeping and to have
50 /// them block.
51 worker_sleep_states: Vec<CachePadded<WorkerSleepState>>,
52
53 counters: AtomicCounters,
54
55 data: Mutex<SleepData>,
56}
57
58/// An instance of this struct is created when a thread becomes idle.
59/// It is consumed when the thread finds work, and passed by `&mut`
60/// reference for operations that preserve the idle state. (In other
61/// words, producing one of these structs is evidence the thread is
62/// idle.) It tracks state such as how long the thread has been idle.
63pub(super) struct IdleState {
64 /// What is worker index of the idle thread?
65 worker_index: usize,
66
67 /// How many rounds have we been circling without sleeping?
68 rounds: u32,
69
70 /// Once we become sleepy, what was the sleepy counter value?
71 /// Set to `INVALID_SLEEPY_COUNTER` otherwise.
72 jobs_counter: JobsEventCounter,
73}
74
75/// The "sleep state" for an individual worker.
76#[derive(Default)]
77struct WorkerSleepState {
78 /// Set to true when the worker goes to sleep; set to false when
79 /// the worker is notified or when it wakes.
80 is_blocked: Mutex<bool>,
81
82 condvar: Condvar,
83}
84
85const ROUNDS_UNTIL_SLEEPY: u32 = 32;
86const ROUNDS_UNTIL_SLEEPING: u32 = ROUNDS_UNTIL_SLEEPY + 1;
87
88impl Sleep {
89 pub(super) fn new(n_threads: usize) -> Sleep {
90 assert!(n_threads <= THREADS_MAX);
91 Sleep {
92 worker_sleep_states: (0..n_threads).map(|_| Default::default()).collect(),
93 counters: AtomicCounters::new(),
94 data: Mutex::new(SleepData {
95 worker_count: n_threads,
96 active_threads: n_threads,
97 blocked_threads: 0,
98 }),
99 }
100 }
101
102 /// Mark a Rayon worker thread as blocked. This triggers the deadlock handler
103 /// if no other worker thread is active
104 #[inline]
105 pub(super) fn mark_blocked(&self, deadlock_handler: &Option<Box<DeadlockHandler>>) {
106 let mut data = self.data.lock().unwrap();
107 debug_assert!(data.active_threads > 0);
108 debug_assert!(data.blocked_threads < data.worker_count);
109 debug_assert!(data.active_threads > 0);
110 data.active_threads -= 1;
111 data.blocked_threads += 1;
112
113 data.deadlock_check(deadlock_handler);
114 }
115
116 /// Mark a previously blocked Rayon worker thread as unblocked
117 #[inline]
118 pub(super) fn mark_unblocked(&self) {
119 let mut data = self.data.lock().unwrap();
120 debug_assert!(data.active_threads < data.worker_count);
121 debug_assert!(data.blocked_threads > 0);
122 data.active_threads += 1;
123 data.blocked_threads -= 1;
124 }
125
126 #[inline]
127 pub(super) fn start_looking(&self, worker_index: usize) -> IdleState {
128 self.counters.add_inactive_thread();
129
130 IdleState { worker_index, rounds: 0, jobs_counter: JobsEventCounter::DUMMY }
131 }
132
133 #[inline]
134 pub(super) fn work_found(&self) {
135 // If we were the last idle thread and other threads are still sleeping,
136 // then we should wake up another thread.
137 let threads_to_wake = self.counters.sub_inactive_thread();
138 self.wake_any_threads(threads_to_wake as u32);
139 }
140
141 #[inline]
142 pub(super) fn no_work_found(
143 &self,
144 idle_state: &mut IdleState,
145 latch: &CoreLatch,
146 thread: &WorkerThread,
147 ) {
148 if idle_state.rounds < ROUNDS_UNTIL_SLEEPY {
149 thread::yield_now();
150 idle_state.rounds += 1;
151 } else if idle_state.rounds == ROUNDS_UNTIL_SLEEPY {
152 idle_state.jobs_counter = self.announce_sleepy();
153 idle_state.rounds += 1;
154 thread::yield_now();
155 } else if idle_state.rounds < ROUNDS_UNTIL_SLEEPING {
156 idle_state.rounds += 1;
157 thread::yield_now();
158 } else {
159 debug_assert_eq!(idle_state.rounds, ROUNDS_UNTIL_SLEEPING);
160 self.sleep(idle_state, latch, thread);
161 }
162 }
163
164 #[cold]
165 fn announce_sleepy(&self) -> JobsEventCounter {
166 self.counters.increment_jobs_event_counter_if(JobsEventCounter::is_active).jobs_counter()
167 }
168
169 #[cold]
170 fn sleep(&self, idle_state: &mut IdleState, latch: &CoreLatch, thread: &WorkerThread) {
171 let worker_index = idle_state.worker_index;
172
173 if !latch.get_sleepy() {
174 return;
175 }
176
177 let sleep_state = &self.worker_sleep_states[worker_index];
178 let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
179 debug_assert!(!*is_blocked);
180
181 // Our latch was signalled. We should wake back up fully as we
182 // will have some stuff to do.
183 if !latch.fall_asleep() {
184 idle_state.wake_fully();
185 return;
186 }
187
188 loop {
189 let counters = self.counters.load(Ordering::SeqCst);
190
191 // Check if the JEC has changed since we got sleepy.
192 debug_assert!(idle_state.jobs_counter.is_sleepy());
193 if counters.jobs_counter() != idle_state.jobs_counter {
194 // JEC has changed, so a new job was posted, but for some reason
195 // we didn't see it. We should return to just before the SLEEPY
196 // state so we can do another search and (if we fail to find
197 // work) go back to sleep.
198 idle_state.wake_partly();
199 latch.wake_up();
200 return;
201 }
202
203 // Otherwise, let's move from IDLE to SLEEPING.
204 if self.counters.try_add_sleeping_thread(counters) {
205 break;
206 }
207 }
208
209 // Successfully registered as asleep.
210
211 // We have one last check for injected jobs to do. This protects against
212 // deadlock in the very unlikely event that
213 //
214 // - an external job is being injected while we are sleepy
215 // - that job triggers the rollover over the JEC such that we don't see it
216 // - we are the last active worker thread
217 std::sync::atomic::fence(Ordering::SeqCst);
218 if thread.has_injected_job() {
219 // If we see an externally injected job, then we have to 'wake
220 // ourselves up'. (Ordinarily, `sub_sleeping_thread` is invoked by
221 // the one that wakes us.)
222 self.counters.sub_sleeping_thread();
223 } else {
224 {
225 // Decrement the number of active threads and check for a deadlock
226 let mut data = self.data.lock().unwrap();
227 data.active_threads -= 1;
228 data.deadlock_check(&thread.registry.deadlock_handler);
229 }
230
231 // If we don't see an injected job (the normal case), then flag
232 // ourselves as asleep and wait till we are notified.
233 //
234 // (Note that `is_blocked` is held under a mutex and the mutex was
235 // acquired *before* we incremented the "sleepy counter". This means
236 // that whomever is coming to wake us will have to wait until we
237 // release the mutex in the call to `wait`, so they will see this
238 // boolean as true.)
239 thread.registry.release_thread();
240 *is_blocked = true;
241 while *is_blocked {
242 is_blocked = sleep_state.condvar.wait(is_blocked).unwrap();
243 }
244
245 // Drop `is_blocked` now in case `acquire_thread` blocks
246 drop(is_blocked);
247
248 thread.registry.acquire_thread();
249 }
250
251 // Update other state:
252 idle_state.wake_fully();
253 latch.wake_up();
254 }
255
256 /// Notify the given thread that it should wake up (if it is
257 /// sleeping). When this method is invoked, we typically know the
258 /// thread is asleep, though in rare cases it could have been
259 /// awoken by (e.g.) new work having been posted.
260 pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) {
261 self.wake_specific_thread(target_worker_index);
262 }
263
264 /// Signals that `num_jobs` new jobs were injected into the thread
265 /// pool from outside. This function will ensure that there are
266 /// threads available to process them, waking threads from sleep
267 /// if necessary.
268 ///
269 /// # Parameters
270 ///
271 /// - `num_jobs` -- lower bound on number of jobs available for stealing.
272 /// We'll try to get at least one thread per job.
273 #[inline]
274 pub(super) fn new_injected_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
275 // This fence is needed to guarantee that threads
276 // as they are about to fall asleep, observe any
277 // new jobs that may have been injected.
278 std::sync::atomic::fence(Ordering::SeqCst);
279
280 self.new_jobs(num_jobs, queue_was_empty)
281 }
282
283 /// Signals that `num_jobs` new jobs were pushed onto a thread's
284 /// local deque. This function will try to ensure that there are
285 /// threads available to process them, waking threads from sleep
286 /// if necessary. However, this is not guaranteed: under certain
287 /// race conditions, the function may fail to wake any new
288 /// threads; in that case the existing thread should eventually
289 /// pop the job.
290 ///
291 /// # Parameters
292 ///
293 /// - `num_jobs` -- lower bound on number of jobs available for stealing.
294 /// We'll try to get at least one thread per job.
295 #[inline]
296 pub(super) fn new_internal_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
297 self.new_jobs(num_jobs, queue_was_empty)
298 }
299
300 /// Common helper for `new_injected_jobs` and `new_internal_jobs`.
301 #[inline]
302 fn new_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
303 // Read the counters and -- if sleepy workers have announced themselves
304 // -- announce that there is now work available. The final value of `counters`
305 // with which we exit the loop thus corresponds to a state when
306 let counters = self.counters.increment_jobs_event_counter_if(JobsEventCounter::is_sleepy);
307 let num_awake_but_idle = counters.awake_but_idle_threads();
308 let num_sleepers = counters.sleeping_threads();
309
310 if num_sleepers == 0 {
311 // nobody to wake
312 return;
313 }
314
315 // Promote from u16 to u32 so we can interoperate with
316 // num_jobs more easily.
317 let num_awake_but_idle = num_awake_but_idle as u32;
318 let num_sleepers = num_sleepers as u32;
319
320 // If the queue is non-empty, then we always wake up a worker
321 // -- clearly the existing idle jobs aren't enough. Otherwise,
322 // check to see if we have enough idle workers.
323 if !queue_was_empty {
324 let num_to_wake = Ord::min(num_jobs, num_sleepers);
325 self.wake_any_threads(num_to_wake);
326 } else if num_awake_but_idle < num_jobs {
327 let num_to_wake = Ord::min(num_jobs - num_awake_but_idle, num_sleepers);
328 self.wake_any_threads(num_to_wake);
329 }
330 }
331
332 #[cold]
333 fn wake_any_threads(&self, mut num_to_wake: u32) {
334 if num_to_wake > 0 {
335 for i in 0..self.worker_sleep_states.len() {
336 if self.wake_specific_thread(i) {
337 num_to_wake -= 1;
338 if num_to_wake == 0 {
339 return;
340 }
341 }
342 }
343 }
344 }
345
346 fn wake_specific_thread(&self, index: usize) -> bool {
347 let sleep_state = &self.worker_sleep_states[index];
348
349 let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
350 if *is_blocked {
351 *is_blocked = false;
352
353 // Increment the number of active threads
354 self.data.lock().unwrap().active_threads += 1;
355
356 sleep_state.condvar.notify_one();
357
358 // When the thread went to sleep, it will have incremented
359 // this value. When we wake it, its our job to decrement
360 // it. We could have the thread do it, but that would
361 // introduce a delay between when the thread was
362 // *notified* and when this counter was decremented. That
363 // might mislead people with new work into thinking that
364 // there are sleeping threads that they should try to
365 // wake, when in fact there is nothing left for them to
366 // do.
367 self.counters.sub_sleeping_thread();
368
369 true
370 } else {
371 false
372 }
373 }
374}
375
376impl IdleState {
377 fn wake_fully(&mut self) {
378 self.rounds = 0;
379 self.jobs_counter = JobsEventCounter::DUMMY;
380 }
381
382 fn wake_partly(&mut self) {
383 self.rounds = ROUNDS_UNTIL_SLEEPY;
384 self.jobs_counter = JobsEventCounter::DUMMY;
385 }
386}