rustc_thread_pool/
latch.rs

1use std::marker::PhantomData;
2use std::ops::Deref;
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::sync::{Arc, Condvar, Mutex};
5
6use crate::registry::{Registry, WorkerThread};
7
8/// We define various kinds of latches, which are all a primitive signaling
9/// mechanism. A latch starts as false. Eventually someone calls `set()` and
10/// it becomes true. You can test if it has been set by calling `probe()`.
11///
12/// Some kinds of latches, but not all, support a `wait()` operation
13/// that will wait until the latch is set, blocking efficiently. That
14/// is not part of the trait since it is not possibly to do with all
15/// latches.
16///
17/// The intention is that `set()` is called once, but `probe()` may be
18/// called any number of times. Once `probe()` returns true, the memory
19/// effects that occurred before `set()` become visible.
20///
21/// It'd probably be better to refactor the API into two paired types,
22/// but that's a bit of work, and this is not a public API.
23///
24/// ## Memory ordering
25///
26/// Latches need to guarantee two things:
27///
28/// - Once `probe()` returns true, all memory effects from the `set()`
29///   are visible (in other words, the set should synchronize-with
30///   the probe).
31/// - Once `set()` occurs, the next `probe()` *will* observe it. This
32///   typically requires a seq-cst ordering. See [the "tickle-then-get-sleepy" scenario in the sleep
33///   README](/src/sleep/README.md#tickle-then-get-sleepy) for details.
34pub(super) trait Latch {
35    /// Set the latch, signalling others.
36    ///
37    /// # WARNING
38    ///
39    /// Setting a latch triggers other threads to wake up and (in some
40    /// cases) complete. This may, in turn, cause memory to be
41    /// deallocated and so forth. One must be very careful about this,
42    /// and it's typically better to read all the fields you will need
43    /// to access *before* a latch is set!
44    ///
45    /// This function operates on `*const Self` instead of `&self` to allow it
46    /// to become dangling during this call. The caller must ensure that the
47    /// pointer is valid upon entry, and not invalidated during the call by any
48    /// actions other than `set` itself.
49    unsafe fn set(this: *const Self);
50}
51
52pub(super) trait AsCoreLatch {
53    fn as_core_latch(&self) -> &CoreLatch;
54}
55
56/// Latch is not set, owning thread is awake
57const UNSET: usize = 0;
58
59/// Latch is not set, owning thread is going to sleep on this latch
60/// (but has not yet fallen asleep).
61const SLEEPY: usize = 1;
62
63/// Latch is not set, owning thread is asleep on this latch and
64/// must be awoken.
65const SLEEPING: usize = 2;
66
67/// Latch is set.
68const SET: usize = 3;
69
70/// Spin latches are the simplest, most efficient kind, but they do
71/// not support a `wait()` operation. They just have a boolean flag
72/// that becomes true when `set()` is called.
73#[derive(Debug)]
74pub(super) struct CoreLatch {
75    state: AtomicUsize,
76}
77
78impl CoreLatch {
79    #[inline]
80    fn new() -> Self {
81        Self { state: AtomicUsize::new(0) }
82    }
83
84    /// Invoked by owning thread as it prepares to sleep. Returns true
85    /// if the owning thread may proceed to fall asleep, false if the
86    /// latch was set in the meantime.
87    #[inline]
88    pub(super) fn get_sleepy(&self) -> bool {
89        self.state.compare_exchange(UNSET, SLEEPY, Ordering::SeqCst, Ordering::Relaxed).is_ok()
90    }
91
92    /// Invoked by owning thread as it falls asleep sleep. Returns
93    /// true if the owning thread should block, or false if the latch
94    /// was set in the meantime.
95    #[inline]
96    pub(super) fn fall_asleep(&self) -> bool {
97        self.state.compare_exchange(SLEEPY, SLEEPING, Ordering::SeqCst, Ordering::Relaxed).is_ok()
98    }
99
100    /// Invoked by owning thread as it falls asleep sleep. Returns
101    /// true if the owning thread should block, or false if the latch
102    /// was set in the meantime.
103    #[inline]
104    pub(super) fn wake_up(&self) {
105        if !self.probe() {
106            let _ =
107                self.state.compare_exchange(SLEEPING, UNSET, Ordering::SeqCst, Ordering::Relaxed);
108        }
109    }
110
111    /// Set the latch. If this returns true, the owning thread was sleeping
112    /// and must be awoken.
113    ///
114    /// This is private because, typically, setting a latch involves
115    /// doing some wakeups; those are encapsulated in the surrounding
116    /// latch code.
117    #[inline]
118    unsafe fn set(this: *const Self) -> bool {
119        let old_state = unsafe { (*this).state.swap(SET, Ordering::AcqRel) };
120        old_state == SLEEPING
121    }
122
123    /// Test if this latch has been set.
124    #[inline]
125    pub(super) fn probe(&self) -> bool {
126        self.state.load(Ordering::Acquire) == SET
127    }
128}
129
130impl AsCoreLatch for CoreLatch {
131    #[inline]
132    fn as_core_latch(&self) -> &CoreLatch {
133        self
134    }
135}
136
137/// Spin latches are the simplest, most efficient kind, but they do
138/// not support a `wait()` operation. They just have a boolean flag
139/// that becomes true when `set()` is called.
140pub(super) struct SpinLatch<'r> {
141    core_latch: CoreLatch,
142    registry: &'r Arc<Registry>,
143    target_worker_index: usize,
144    cross: bool,
145}
146
147impl<'r> SpinLatch<'r> {
148    /// Creates a new spin latch that is owned by `thread`. This means
149    /// that `thread` is the only thread that should be blocking on
150    /// this latch -- it also means that when the latch is set, we
151    /// will wake `thread` if it is sleeping.
152    #[inline]
153    pub(super) fn new(thread: &'r WorkerThread) -> SpinLatch<'r> {
154        SpinLatch {
155            core_latch: CoreLatch::new(),
156            registry: thread.registry(),
157            target_worker_index: thread.index(),
158            cross: false,
159        }
160    }
161
162    /// Creates a new spin latch for cross-threadpool blocking. Notably, we
163    /// need to make sure the registry is kept alive after setting, so we can
164    /// safely call the notification.
165    #[inline]
166    pub(super) fn cross(thread: &'r WorkerThread) -> SpinLatch<'r> {
167        SpinLatch { cross: true, ..SpinLatch::new(thread) }
168    }
169
170    #[inline]
171    pub(super) fn probe(&self) -> bool {
172        self.core_latch.probe()
173    }
174}
175
176impl<'r> AsCoreLatch for SpinLatch<'r> {
177    #[inline]
178    fn as_core_latch(&self) -> &CoreLatch {
179        &self.core_latch
180    }
181}
182
183impl<'r> Latch for SpinLatch<'r> {
184    #[inline]
185    unsafe fn set(this: *const Self) {
186        let cross_registry;
187
188        let registry: &Registry = if unsafe { (*this).cross } {
189            // Ensure the registry stays alive while we notify it.
190            // Otherwise, it would be possible that we set the spin
191            // latch and the other thread sees it and exits, causing
192            // the registry to be deallocated, all before we get a
193            // chance to invoke `registry.notify_worker_latch_is_set`.
194            cross_registry = Arc::clone(unsafe { (*this).registry });
195            &cross_registry
196        } else {
197            // If this is not a "cross-registry" spin-latch, then the
198            // thread which is performing `set` is itself ensuring
199            // that the registry stays alive. However, that doesn't
200            // include this *particular* `Arc` handle if the waiting
201            // thread then exits, so we must completely dereference it.
202            unsafe { (*this).registry }
203        };
204        let target_worker_index = unsafe { (*this).target_worker_index };
205
206        // NOTE: Once we `set`, the target may proceed and invalidate `this`!
207        if unsafe { CoreLatch::set(&(*this).core_latch) } {
208            // Subtle: at this point, we can no longer read from
209            // `self`, because the thread owning this spin latch may
210            // have awoken and deallocated the latch. Therefore, we
211            // only use fields whose values we already read.
212            registry.notify_worker_latch_is_set(target_worker_index);
213        }
214    }
215}
216
217/// A Latch starts as false and eventually becomes true. You can block
218/// until it becomes true.
219#[derive(Debug)]
220pub(super) struct LockLatch {
221    m: Mutex<bool>,
222    v: Condvar,
223}
224
225impl LockLatch {
226    #[inline]
227    pub(super) fn new() -> LockLatch {
228        LockLatch { m: Mutex::new(false), v: Condvar::new() }
229    }
230
231    /// Block until latch is set, then resets this lock latch so it can be reused again.
232    pub(super) fn wait_and_reset(&self) {
233        let mut guard = self.m.lock().unwrap();
234        while !*guard {
235            guard = self.v.wait(guard).unwrap();
236        }
237        *guard = false;
238    }
239
240    /// Block until latch is set.
241    pub(super) fn wait(&self) {
242        let mut guard = self.m.lock().unwrap();
243        while !*guard {
244            guard = self.v.wait(guard).unwrap();
245        }
246    }
247}
248
249impl Latch for LockLatch {
250    #[inline]
251    unsafe fn set(this: *const Self) {
252        let mut guard = unsafe { (*this).m.lock().unwrap() };
253        *guard = true;
254        unsafe { (*this).v.notify_all() };
255    }
256}
257
258/// Once latches are used to implement one-time blocking, primarily
259/// for the termination flag of the threads in the pool.
260///
261/// Note: like a `SpinLatch`, once-latches are always associated with
262/// some registry that is probing them, which must be tickled when
263/// they are set. *Unlike* a `SpinLatch`, they don't themselves hold a
264/// reference to that registry. This is because in some cases the
265/// registry owns the once-latch, and that would create a cycle. So a
266/// `OnceLatch` must be given a reference to its owning registry when
267/// it is set. For this reason, it does not implement the `Latch`
268/// trait (but it doesn't have to, as it is not used in those generic
269/// contexts).
270#[derive(Debug)]
271pub(super) struct OnceLatch {
272    core_latch: CoreLatch,
273}
274
275impl OnceLatch {
276    #[inline]
277    pub(super) fn new() -> OnceLatch {
278        Self { core_latch: CoreLatch::new() }
279    }
280
281    /// Set the latch, then tickle the specific worker thread,
282    /// which should be the one that owns this latch.
283    #[inline]
284    pub(super) unsafe fn set_and_tickle_one(
285        this: *const Self,
286        registry: &Registry,
287        target_worker_index: usize,
288    ) {
289        if unsafe { CoreLatch::set(&(*this).core_latch) } {
290            registry.notify_worker_latch_is_set(target_worker_index);
291        }
292    }
293}
294
295impl AsCoreLatch for OnceLatch {
296    #[inline]
297    fn as_core_latch(&self) -> &CoreLatch {
298        &self.core_latch
299    }
300}
301
302/// Counting latches are used to implement scopes. They track a
303/// counter. Unlike other latches, calling `set()` does not
304/// necessarily make the latch be considered `set()`; instead, it just
305/// decrements the counter. The latch is only "set" (in the sense that
306/// `probe()` returns true) once the counter reaches zero.
307#[derive(Debug)]
308pub(super) struct CountLatch {
309    counter: AtomicUsize,
310    kind: CountLatchKind,
311}
312
313enum CountLatchKind {
314    /// A latch for scopes created on a rayon thread which will participate in work-
315    /// stealing while it waits for completion. This thread is not necessarily part
316    /// of the same registry as the scope itself!
317    Stealing {
318        latch: CoreLatch,
319        /// If a worker thread in registry A calls `in_place_scope` on a ThreadPool
320        /// with registry B, when a job completes in a thread of registry B, we may
321        /// need to call `notify_worker_latch_is_set()` to wake the thread in registry A.
322        /// That means we need a reference to registry A (since at that point we will
323        /// only have a reference to registry B), so we stash it here.
324        registry: Arc<Registry>,
325        /// The index of the worker to wake in `registry`
326        worker_index: usize,
327    },
328
329    /// A latch for scopes created on a non-rayon thread which will block to wait.
330    Blocking { latch: LockLatch },
331}
332
333impl std::fmt::Debug for CountLatchKind {
334    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
335        match self {
336            CountLatchKind::Stealing { latch, .. } => {
337                f.debug_tuple("Stealing").field(latch).finish()
338            }
339            CountLatchKind::Blocking { latch, .. } => {
340                f.debug_tuple("Blocking").field(latch).finish()
341            }
342        }
343    }
344}
345
346impl CountLatch {
347    pub(super) fn new(owner: Option<&WorkerThread>) -> Self {
348        Self::with_count(1, owner)
349    }
350
351    pub(super) fn with_count(count: usize, owner: Option<&WorkerThread>) -> Self {
352        Self {
353            counter: AtomicUsize::new(count),
354            kind: match owner {
355                Some(owner) => CountLatchKind::Stealing {
356                    latch: CoreLatch::new(),
357                    registry: Arc::clone(owner.registry()),
358                    worker_index: owner.index(),
359                },
360                None => CountLatchKind::Blocking { latch: LockLatch::new() },
361            },
362        }
363    }
364
365    #[inline]
366    pub(super) fn increment(&self) {
367        let old_counter = self.counter.fetch_add(1, Ordering::Relaxed);
368        debug_assert!(old_counter != 0);
369    }
370
371    pub(super) fn wait(&self, owner: Option<&WorkerThread>) {
372        match &self.kind {
373            CountLatchKind::Stealing { latch, registry, worker_index } => unsafe {
374                let owner = owner.expect("owner thread");
375                debug_assert_eq!(registry.id(), owner.registry().id());
376                debug_assert_eq!(*worker_index, owner.index());
377                owner.wait_until(latch);
378            },
379            CountLatchKind::Blocking { latch } => latch.wait(),
380        }
381    }
382}
383
384impl Latch for CountLatch {
385    #[inline]
386    unsafe fn set(this: *const Self) {
387        if unsafe { (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 } {
388            // NOTE: Once we call `set` on the internal `latch`,
389            // the target may proceed and invalidate `this`!
390            match unsafe { &(*this).kind } {
391                CountLatchKind::Stealing { latch, registry, worker_index } => {
392                    let registry = Arc::clone(registry);
393                    if unsafe { CoreLatch::set(latch) } {
394                        registry.notify_worker_latch_is_set(*worker_index);
395                    }
396                }
397                CountLatchKind::Blocking { latch } => unsafe { LockLatch::set(latch) },
398            }
399        }
400    }
401}
402
403/// `&L` without any implication of `dereferenceable` for `Latch::set`
404pub(super) struct LatchRef<'a, L> {
405    inner: *const L,
406    marker: PhantomData<&'a L>,
407}
408
409impl<L> LatchRef<'_, L> {
410    pub(super) fn new(inner: &L) -> LatchRef<'_, L> {
411        LatchRef { inner, marker: PhantomData }
412    }
413}
414
415unsafe impl<L: Sync> Sync for LatchRef<'_, L> {}
416
417impl<L> Deref for LatchRef<'_, L> {
418    type Target = L;
419
420    fn deref(&self) -> &L {
421        // SAFETY: if we have &self, the inner latch is still alive
422        unsafe { &*self.inner }
423    }
424}
425
426impl<L: Latch> Latch for LatchRef<'_, L> {
427    #[inline]
428    unsafe fn set(this: *const Self) {
429        unsafe { L::set((*this).inner) };
430    }
431}