rustc_thread_pool/sleep/mod.rs
1//! Code that decides when workers should go to sleep. See README.md
2//! for an overview.
3
4use std::sync::atomic::Ordering;
5use std::sync::{Condvar, Mutex};
6use std::thread;
7
8use crossbeam_utils::CachePadded;
9
10use crate::DeadlockHandler;
11use crate::latch::CoreLatch;
12use crate::registry::WorkerThread;
13
14mod counters;
15pub(crate) use self::counters::THREADS_MAX;
16use self::counters::{AtomicCounters, JobsEventCounter};
17
18struct SleepData {
19 /// The number of threads in the thread pool.
20 worker_count: usize,
21
22 /// The number of threads in the thread pool which are running and
23 /// aren't blocked in user code or sleeping.
24 active_threads: usize,
25
26 /// The number of threads which are blocked in user code.
27 /// This doesn't include threads blocked by this module.
28 blocked_threads: usize,
29}
30
31impl SleepData {
32 /// Checks if the conditions for a deadlock holds and if so calls the deadlock handler
33 #[inline]
34 pub(super) fn deadlock_check(&self, deadlock_handler: &Option<Box<DeadlockHandler>>) {
35 if self.active_threads == 0 && self.blocked_threads > 0 {
36 (deadlock_handler.as_ref().unwrap())();
37 }
38 }
39}
40
41/// The `Sleep` struct is embedded into each registry. It governs the waking and sleeping
42/// of workers. It has callbacks that are invoked periodically at significant events,
43/// such as when workers are looping and looking for work, when latches are set, or when
44/// jobs are published, and it either blocks threads or wakes them in response to these
45/// events. See the [`README.md`] in this module for more details.
46///
47/// [`README.md`] README.md
48pub(super) struct Sleep {
49 /// One "sleep state" per worker. Used to track if a worker is sleeping and to have
50 /// them block.
51 worker_sleep_states: Vec<CachePadded<WorkerSleepState>>,
52
53 counters: AtomicCounters,
54
55 data: Mutex<SleepData>,
56}
57
58/// An instance of this struct is created when a thread becomes idle.
59/// It is consumed when the thread finds work, and passed by `&mut`
60/// reference for operations that preserve the idle state. (In other
61/// words, producing one of these structs is evidence the thread is
62/// idle.) It tracks state such as how long the thread has been idle.
63pub(super) struct IdleState {
64 /// What is worker index of the idle thread?
65 worker_index: usize,
66
67 /// How many rounds have we been circling without sleeping?
68 rounds: u32,
69
70 /// Once we become sleepy, what was the sleepy counter value?
71 /// Set to `INVALID_SLEEPY_COUNTER` otherwise.
72 jobs_counter: JobsEventCounter,
73}
74
75/// The "sleep state" for an individual worker.
76#[derive(Default)]
77struct WorkerSleepState {
78 /// Set to true when the worker goes to sleep; set to false when
79 /// the worker is notified or when it wakes.
80 is_blocked: Mutex<bool>,
81
82 condvar: Condvar,
83}
84
85const ROUNDS_UNTIL_SLEEPY: u32 = 32;
86const ROUNDS_UNTIL_SLEEPING: u32 = ROUNDS_UNTIL_SLEEPY + 1;
87
88impl Sleep {
89 pub(super) fn new(n_threads: usize) -> Sleep {
90 assert!(n_threads <= THREADS_MAX);
91 Sleep {
92 worker_sleep_states: (0..n_threads).map(|_| Default::default()).collect(),
93 counters: AtomicCounters::new(),
94 data: Mutex::new(SleepData {
95 worker_count: n_threads,
96 active_threads: n_threads,
97 blocked_threads: 0,
98 }),
99 }
100 }
101
102 /// Mark a Rayon worker thread as blocked. This triggers the deadlock handler
103 /// if no other worker thread is active
104 #[inline]
105 pub(super) fn mark_blocked(&self, deadlock_handler: &Option<Box<DeadlockHandler>>) {
106 let mut data = self.data.lock().unwrap();
107 debug_assert!(data.active_threads > 0);
108 debug_assert!(data.blocked_threads < data.worker_count);
109 debug_assert!(data.active_threads > 0);
110 data.active_threads -= 1;
111 data.blocked_threads += 1;
112
113 data.deadlock_check(deadlock_handler);
114 }
115
116 /// Mark a previously blocked Rayon worker thread as unblocked
117 #[inline]
118 pub(super) fn mark_unblocked(&self) {
119 let mut data = self.data.lock().unwrap();
120 debug_assert!(data.active_threads < data.worker_count);
121 debug_assert!(data.blocked_threads > 0);
122 data.active_threads += 1;
123 data.blocked_threads -= 1;
124 }
125
126 #[inline]
127 pub(super) fn start_looking(&self, worker_index: usize) -> IdleState {
128 self.counters.add_inactive_thread();
129
130 IdleState { worker_index, rounds: 0, jobs_counter: JobsEventCounter::DUMMY }
131 }
132
133 #[inline]
134 pub(super) fn work_found(&self) {
135 // If we were the last idle thread and other threads are still sleeping,
136 // then we should wake up another thread.
137 let threads_to_wake = self.counters.sub_inactive_thread();
138 self.wake_any_threads(threads_to_wake as u32);
139 }
140
141 #[inline]
142 pub(super) fn no_work_found(
143 &self,
144 idle_state: &mut IdleState,
145 latch: &CoreLatch,
146 thread: &WorkerThread,
147 steal: bool,
148 ) {
149 if idle_state.rounds < ROUNDS_UNTIL_SLEEPY {
150 thread::yield_now();
151 idle_state.rounds += 1;
152 } else if idle_state.rounds == ROUNDS_UNTIL_SLEEPY {
153 idle_state.jobs_counter = self.announce_sleepy();
154 idle_state.rounds += 1;
155 thread::yield_now();
156 } else if idle_state.rounds < ROUNDS_UNTIL_SLEEPING {
157 idle_state.rounds += 1;
158 thread::yield_now();
159 } else {
160 debug_assert_eq!(idle_state.rounds, ROUNDS_UNTIL_SLEEPING);
161 self.sleep(idle_state, latch, thread, steal);
162 }
163 }
164
165 #[cold]
166 fn announce_sleepy(&self) -> JobsEventCounter {
167 self.counters.increment_jobs_event_counter_if(JobsEventCounter::is_active).jobs_counter()
168 }
169
170 #[cold]
171 fn sleep(
172 &self,
173 idle_state: &mut IdleState,
174 latch: &CoreLatch,
175 thread: &WorkerThread,
176 steal: bool,
177 ) {
178 let worker_index = idle_state.worker_index;
179
180 if !latch.get_sleepy() {
181 return;
182 }
183
184 let sleep_state = &self.worker_sleep_states[worker_index];
185 let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
186 debug_assert!(!*is_blocked);
187
188 // Our latch was signalled. We should wake back up fully as we
189 // will have some stuff to do.
190 if !latch.fall_asleep() {
191 idle_state.wake_fully();
192 return;
193 }
194
195 loop {
196 let counters = self.counters.load(Ordering::SeqCst);
197
198 // Check if the JEC has changed since we got sleepy.
199 debug_assert!(idle_state.jobs_counter.is_sleepy());
200 if counters.jobs_counter() != idle_state.jobs_counter {
201 // JEC has changed, so a new job was posted, but for some reason
202 // we didn't see it. We should return to just before the SLEEPY
203 // state so we can do another search and (if we fail to find
204 // work) go back to sleep.
205 idle_state.wake_partly();
206 latch.wake_up();
207 return;
208 }
209
210 // Otherwise, let's move from IDLE to SLEEPING.
211 if self.counters.try_add_sleeping_thread(counters) {
212 break;
213 }
214 }
215
216 // Successfully registered as asleep.
217
218 // We have one last check for injected jobs to do. This protects against
219 // deadlock in the very unlikely event that
220 //
221 // - an external job is being injected while we are sleepy
222 // - that job triggers the rollover over the JEC such that we don't see it
223 // - we are the last active worker thread
224 std::sync::atomic::fence(Ordering::SeqCst);
225 if steal && thread.has_injected_job() {
226 // If we see an externally injected job, then we have to 'wake
227 // ourselves up'. (Ordinarily, `sub_sleeping_thread` is invoked by
228 // the one that wakes us.)
229 self.counters.sub_sleeping_thread();
230 } else {
231 {
232 // Decrement the number of active threads and check for a deadlock
233 let mut data = self.data.lock().unwrap();
234 data.active_threads -= 1;
235 data.deadlock_check(&thread.registry.deadlock_handler);
236 }
237
238 // If we don't see an injected job (the normal case), then flag
239 // ourselves as asleep and wait till we are notified.
240 //
241 // (Note that `is_blocked` is held under a mutex and the mutex was
242 // acquired *before* we incremented the "sleepy counter". This means
243 // that whomever is coming to wake us will have to wait until we
244 // release the mutex in the call to `wait`, so they will see this
245 // boolean as true.)
246 thread.registry.release_thread();
247 *is_blocked = true;
248 while *is_blocked {
249 is_blocked = sleep_state.condvar.wait(is_blocked).unwrap();
250 }
251
252 // Drop `is_blocked` now in case `acquire_thread` blocks
253 drop(is_blocked);
254
255 thread.registry.acquire_thread();
256 }
257
258 // Update other state:
259 idle_state.wake_fully();
260 latch.wake_up();
261 }
262
263 /// Notify the given thread that it should wake up (if it is
264 /// sleeping). When this method is invoked, we typically know the
265 /// thread is asleep, though in rare cases it could have been
266 /// awoken by (e.g.) new work having been posted.
267 pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) {
268 self.wake_specific_thread(target_worker_index);
269 }
270
271 /// Signals that `num_jobs` new jobs were injected into the thread
272 /// pool from outside. This function will ensure that there are
273 /// threads available to process them, waking threads from sleep
274 /// if necessary.
275 ///
276 /// # Parameters
277 ///
278 /// - `num_jobs` -- lower bound on number of jobs available for stealing.
279 /// We'll try to get at least one thread per job.
280 #[inline]
281 pub(super) fn new_injected_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
282 // This fence is needed to guarantee that threads
283 // as they are about to fall asleep, observe any
284 // new jobs that may have been injected.
285 std::sync::atomic::fence(Ordering::SeqCst);
286
287 self.new_jobs(num_jobs, queue_was_empty)
288 }
289
290 /// Signals that `num_jobs` new jobs were pushed onto a thread's
291 /// local deque. This function will try to ensure that there are
292 /// threads available to process them, waking threads from sleep
293 /// if necessary. However, this is not guaranteed: under certain
294 /// race conditions, the function may fail to wake any new
295 /// threads; in that case the existing thread should eventually
296 /// pop the job.
297 ///
298 /// # Parameters
299 ///
300 /// - `num_jobs` -- lower bound on number of jobs available for stealing.
301 /// We'll try to get at least one thread per job.
302 #[inline]
303 pub(super) fn new_internal_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
304 self.new_jobs(num_jobs, queue_was_empty)
305 }
306
307 /// Common helper for `new_injected_jobs` and `new_internal_jobs`.
308 #[inline]
309 fn new_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
310 // Read the counters and -- if sleepy workers have announced themselves
311 // -- announce that there is now work available. The final value of `counters`
312 // with which we exit the loop thus corresponds to a state when
313 let counters = self.counters.increment_jobs_event_counter_if(JobsEventCounter::is_sleepy);
314 let num_awake_but_idle = counters.awake_but_idle_threads();
315 let num_sleepers = counters.sleeping_threads();
316
317 if num_sleepers == 0 {
318 // nobody to wake
319 return;
320 }
321
322 // Promote from u16 to u32 so we can interoperate with
323 // num_jobs more easily.
324 let num_awake_but_idle = num_awake_but_idle as u32;
325 let num_sleepers = num_sleepers as u32;
326
327 // If the queue is non-empty, then we always wake up a worker
328 // -- clearly the existing idle jobs aren't enough. Otherwise,
329 // check to see if we have enough idle workers.
330 if !queue_was_empty {
331 let num_to_wake = Ord::min(num_jobs, num_sleepers);
332 self.wake_any_threads(num_to_wake);
333 } else if num_awake_but_idle < num_jobs {
334 let num_to_wake = Ord::min(num_jobs - num_awake_but_idle, num_sleepers);
335 self.wake_any_threads(num_to_wake);
336 }
337 }
338
339 #[cold]
340 fn wake_any_threads(&self, mut num_to_wake: u32) {
341 if num_to_wake > 0 {
342 for i in 0..self.worker_sleep_states.len() {
343 if self.wake_specific_thread(i) {
344 num_to_wake -= 1;
345 if num_to_wake == 0 {
346 return;
347 }
348 }
349 }
350 }
351 }
352
353 fn wake_specific_thread(&self, index: usize) -> bool {
354 let sleep_state = &self.worker_sleep_states[index];
355
356 let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
357 if *is_blocked {
358 *is_blocked = false;
359
360 // Increment the number of active threads
361 self.data.lock().unwrap().active_threads += 1;
362
363 sleep_state.condvar.notify_one();
364
365 // When the thread went to sleep, it will have incremented
366 // this value. When we wake it, its our job to decrement
367 // it. We could have the thread do it, but that would
368 // introduce a delay between when the thread was
369 // *notified* and when this counter was decremented. That
370 // might mislead people with new work into thinking that
371 // there are sleeping threads that they should try to
372 // wake, when in fact there is nothing left for them to
373 // do.
374 self.counters.sub_sleeping_thread();
375
376 true
377 } else {
378 false
379 }
380 }
381}
382
383impl IdleState {
384 fn wake_fully(&mut self) {
385 self.rounds = 0;
386 self.jobs_counter = JobsEventCounter::DUMMY;
387 }
388
389 fn wake_partly(&mut self) {
390 self.rounds = ROUNDS_UNTIL_SLEEPY;
391 self.jobs_counter = JobsEventCounter::DUMMY;
392 }
393}