rustc_thread_pool/
job.rs

1use std::any::Any;
2use std::cell::UnsafeCell;
3use std::mem;
4use std::sync::Arc;
5
6use crossbeam_deque::{Injector, Steal};
7
8use crate::latch::Latch;
9use crate::tlv::Tlv;
10use crate::{tlv, unwind};
11
12pub(super) enum JobResult<T> {
13    None,
14    Ok(T),
15    Panic(Box<dyn Any + Send>),
16}
17
18/// A `Job` is used to advertise work for other threads that they may
19/// want to steal. In accordance with time honored tradition, jobs are
20/// arranged in a deque, so that thieves can take from the top of the
21/// deque while the main worker manages the bottom of the deque. This
22/// deque is managed by the `thread_pool` module.
23pub(super) trait Job {
24    /// Unsafe: this may be called from a different thread than the one
25    /// which scheduled the job, so the implementer must ensure the
26    /// appropriate traits are met, whether `Send`, `Sync`, or both.
27    unsafe fn execute(this: *const ());
28}
29
30/// Effectively a Job trait object. Each JobRef **must** be executed
31/// exactly once, or else data may leak.
32///
33/// Internally, we store the job's data in a `*const ()` pointer. The
34/// true type is something like `*const StackJob<...>`, but we hide
35/// it. We also carry the "execute fn" from the `Job` trait.
36pub(super) struct JobRef {
37    pointer: *const (),
38    execute_fn: unsafe fn(*const ()),
39}
40
41unsafe impl Send for JobRef {}
42unsafe impl Sync for JobRef {}
43
44impl JobRef {
45    /// Unsafe: caller asserts that `data` will remain valid until the
46    /// job is executed.
47    pub(super) unsafe fn new<T>(data: *const T) -> JobRef
48    where
49        T: Job,
50    {
51        // erase types:
52        JobRef { pointer: data as *const (), execute_fn: <T as Job>::execute }
53    }
54
55    /// Returns an opaque handle that can be saved and compared,
56    /// without making `JobRef` itself `Copy + Eq`.
57    #[inline]
58    pub(super) fn id(&self) -> impl Eq {
59        (self.pointer, self.execute_fn)
60    }
61
62    #[inline]
63    pub(super) unsafe fn execute(self) {
64        unsafe { (self.execute_fn)(self.pointer) }
65    }
66}
67
68/// A job that will be owned by a stack slot. This means that when it
69/// executes it need not free any heap data, the cleanup occurs when
70/// the stack frame is later popped. The function parameter indicates
71/// `true` if the job was stolen -- executed on a different thread.
72pub(super) struct StackJob<L, F, R>
73where
74    L: Latch + Sync,
75    F: FnOnce(bool) -> R + Send,
76    R: Send,
77{
78    pub(super) latch: L,
79    func: UnsafeCell<Option<F>>,
80    result: UnsafeCell<JobResult<R>>,
81    tlv: Tlv,
82}
83
84impl<L, F, R> StackJob<L, F, R>
85where
86    L: Latch + Sync,
87    F: FnOnce(bool) -> R + Send,
88    R: Send,
89{
90    pub(super) fn new(tlv: Tlv, func: F, latch: L) -> StackJob<L, F, R> {
91        StackJob {
92            latch,
93            func: UnsafeCell::new(Some(func)),
94            result: UnsafeCell::new(JobResult::None),
95            tlv,
96        }
97    }
98
99    pub(super) unsafe fn as_job_ref(&self) -> JobRef {
100        unsafe { JobRef::new(self) }
101    }
102
103    pub(super) unsafe fn run_inline(self, stolen: bool) -> R {
104        self.func.into_inner().unwrap()(stolen)
105    }
106
107    pub(super) unsafe fn into_result(self) -> R {
108        self.result.into_inner().into_return_value()
109    }
110}
111
112impl<L, F, R> Job for StackJob<L, F, R>
113where
114    L: Latch + Sync,
115    F: FnOnce(bool) -> R + Send,
116    R: Send,
117{
118    unsafe fn execute(this: *const ()) {
119        let this = unsafe { &*(this as *const Self) };
120        tlv::set(this.tlv);
121        let abort = unwind::AbortIfPanic;
122        let func = unsafe { (*this.func.get()).take().unwrap() };
123        unsafe {
124            (*this.result.get()) = JobResult::call(func);
125        }
126        unsafe {
127            Latch::set(&this.latch);
128        }
129        mem::forget(abort);
130    }
131}
132
133/// Represents a job stored in the heap. Used to implement
134/// `scope`. Unlike `StackJob`, when executed, `HeapJob` simply
135/// invokes a closure, which then triggers the appropriate logic to
136/// signal that the job executed.
137///
138/// (Probably `StackJob` should be refactored in a similar fashion.)
139pub(super) struct HeapJob<BODY>
140where
141    BODY: FnOnce() + Send,
142{
143    job: BODY,
144    tlv: Tlv,
145}
146
147impl<BODY> HeapJob<BODY>
148where
149    BODY: FnOnce() + Send,
150{
151    pub(super) fn new(tlv: Tlv, job: BODY) -> Box<Self> {
152        Box::new(HeapJob { job, tlv })
153    }
154
155    /// Creates a `JobRef` from this job -- note that this hides all
156    /// lifetimes, so it is up to you to ensure that this JobRef
157    /// doesn't outlive any data that it closes over.
158    pub(super) unsafe fn into_job_ref(self: Box<Self>) -> JobRef {
159        unsafe { JobRef::new(Box::into_raw(self)) }
160    }
161
162    /// Creates a static `JobRef` from this job.
163    pub(super) fn into_static_job_ref(self: Box<Self>) -> JobRef
164    where
165        BODY: 'static,
166    {
167        unsafe { self.into_job_ref() }
168    }
169}
170
171impl<BODY> Job for HeapJob<BODY>
172where
173    BODY: FnOnce() + Send,
174{
175    unsafe fn execute(this: *const ()) {
176        let this = unsafe { Box::from_raw(this as *mut Self) };
177        tlv::set(this.tlv);
178        (this.job)();
179    }
180}
181
182/// Represents a job stored in an `Arc` -- like `HeapJob`, but may
183/// be turned into multiple `JobRef`s and called multiple times.
184pub(super) struct ArcJob<BODY>
185where
186    BODY: Fn() + Send + Sync,
187{
188    job: BODY,
189}
190
191impl<BODY> ArcJob<BODY>
192where
193    BODY: Fn() + Send + Sync,
194{
195    pub(super) fn new(job: BODY) -> Arc<Self> {
196        Arc::new(ArcJob { job })
197    }
198
199    /// Creates a `JobRef` from this job -- note that this hides all
200    /// lifetimes, so it is up to you to ensure that this JobRef
201    /// doesn't outlive any data that it closes over.
202    pub(super) unsafe fn as_job_ref(this: &Arc<Self>) -> JobRef {
203        unsafe { JobRef::new(Arc::into_raw(Arc::clone(this))) }
204    }
205
206    /// Creates a static `JobRef` from this job.
207    pub(super) fn as_static_job_ref(this: &Arc<Self>) -> JobRef
208    where
209        BODY: 'static,
210    {
211        unsafe { Self::as_job_ref(this) }
212    }
213}
214
215impl<BODY> Job for ArcJob<BODY>
216where
217    BODY: Fn() + Send + Sync,
218{
219    unsafe fn execute(this: *const ()) {
220        let this = unsafe { Arc::from_raw(this as *mut Self) };
221        (this.job)();
222    }
223}
224
225impl<T> JobResult<T> {
226    fn call(func: impl FnOnce(bool) -> T) -> Self {
227        match unwind::halt_unwinding(|| func(true)) {
228            Ok(x) => JobResult::Ok(x),
229            Err(x) => JobResult::Panic(x),
230        }
231    }
232
233    /// Convert the `JobResult` for a job that has finished (and hence
234    /// its JobResult is populated) into its return value.
235    ///
236    /// NB. This will panic if the job panicked.
237    pub(super) fn into_return_value(self) -> T {
238        match self {
239            JobResult::None => unreachable!(),
240            JobResult::Ok(x) => x,
241            JobResult::Panic(x) => unwind::resume_unwinding(x),
242        }
243    }
244}
245
246/// Indirect queue to provide FIFO job priority.
247pub(super) struct JobFifo {
248    inner: Injector<JobRef>,
249}
250
251impl JobFifo {
252    pub(super) fn new() -> Self {
253        JobFifo { inner: Injector::new() }
254    }
255
256    pub(super) unsafe fn push(&self, job_ref: JobRef) -> JobRef {
257        // A little indirection ensures that spawns are always prioritized in FIFO order. The
258        // jobs in a thread's deque may be popped from the back (LIFO) or stolen from the front
259        // (FIFO), but either way they will end up popping from the front of this queue.
260        self.inner.push(job_ref);
261        unsafe { JobRef::new(self) }
262    }
263}
264
265impl Job for JobFifo {
266    unsafe fn execute(this: *const ()) {
267        // We "execute" a queue by executing its first job, FIFO.
268        let this = unsafe { &*(this as *const Self) };
269        loop {
270            match this.inner.steal() {
271                Steal::Success(job_ref) => break unsafe { job_ref.execute() },
272                Steal::Empty => panic!("FIFO is empty"),
273                Steal::Retry => {}
274            }
275        }
276    }
277}