rustc_thread_pool/
job.rs

1use std::any::Any;
2use std::cell::UnsafeCell;
3use std::mem;
4use std::sync::Arc;
5
6use crossbeam_deque::{Injector, Steal};
7
8use crate::latch::Latch;
9use crate::tlv::Tlv;
10use crate::{tlv, unwind};
11
12pub(super) enum JobResult<T> {
13    None,
14    Ok(T),
15    Panic(Box<dyn Any + Send>),
16}
17
18/// A `Job` is used to advertise work for other threads that they may
19/// want to steal. In accordance with time honored tradition, jobs are
20/// arranged in a deque, so that thieves can take from the top of the
21/// deque while the main worker manages the bottom of the deque. This
22/// deque is managed by the `thread_pool` module.
23pub(super) trait Job {
24    /// Unsafe: this may be called from a different thread than the one
25    /// which scheduled the job, so the implementer must ensure the
26    /// appropriate traits are met, whether `Send`, `Sync`, or both.
27    unsafe fn execute(this: *const ());
28}
29
30#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
31pub(super) struct JobRefId {
32    pointer: usize,
33}
34
35/// Effectively a Job trait object. Each JobRef **must** be executed
36/// exactly once, or else data may leak.
37///
38/// Internally, we store the job's data in a `*const ()` pointer. The
39/// true type is something like `*const StackJob<...>`, but we hide
40/// it. We also carry the "execute fn" from the `Job` trait.
41pub(super) struct JobRef {
42    pointer: *const (),
43    execute_fn: unsafe fn(*const ()),
44}
45
46unsafe impl Send for JobRef {}
47unsafe impl Sync for JobRef {}
48
49impl JobRef {
50    /// Unsafe: caller asserts that `data` will remain valid until the
51    /// job is executed.
52    pub(super) unsafe fn new<T>(data: *const T) -> JobRef
53    where
54        T: Job,
55    {
56        // erase types:
57        JobRef { pointer: data as *const (), execute_fn: <T as Job>::execute }
58    }
59
60    #[inline]
61    pub(super) fn id(&self) -> JobRefId {
62        JobRefId { pointer: self.pointer.expose_provenance() }
63    }
64
65    #[inline]
66    pub(super) unsafe fn execute(self) {
67        unsafe { (self.execute_fn)(self.pointer) }
68    }
69}
70
71/// A job that will be owned by a stack slot. This means that when it
72/// executes it need not free any heap data, the cleanup occurs when
73/// the stack frame is later popped. The function parameter indicates
74/// `true` if the job was stolen -- executed on a different thread.
75pub(super) struct StackJob<L, F, R>
76where
77    L: Latch + Sync,
78    F: FnOnce(bool) -> R + Send,
79    R: Send,
80{
81    pub(super) latch: L,
82    func: UnsafeCell<Option<F>>,
83    result: UnsafeCell<JobResult<R>>,
84    tlv: Tlv,
85}
86
87impl<L, F, R> StackJob<L, F, R>
88where
89    L: Latch + Sync,
90    F: FnOnce(bool) -> R + Send,
91    R: Send,
92{
93    pub(super) fn new(tlv: Tlv, func: F, latch: L) -> StackJob<L, F, R> {
94        StackJob {
95            latch,
96            func: UnsafeCell::new(Some(func)),
97            result: UnsafeCell::new(JobResult::None),
98            tlv,
99        }
100    }
101
102    pub(super) unsafe fn as_job_ref(&self) -> JobRef {
103        unsafe { JobRef::new(self) }
104    }
105
106    pub(super) unsafe fn run_inline(&self, stolen: bool) {
107        unsafe {
108            let func = (*self.func.get()).take().unwrap();
109            *(self.result.get()) = match unwind::halt_unwinding(|| func(stolen)) {
110                Ok(x) => JobResult::Ok(x),
111                Err(x) => JobResult::Panic(x),
112            };
113            Latch::set(&self.latch);
114        }
115    }
116
117    pub(super) unsafe fn into_result(self) -> R {
118        self.result.into_inner().into_return_value()
119    }
120}
121
122impl<L, F, R> Job for StackJob<L, F, R>
123where
124    L: Latch + Sync,
125    F: FnOnce(bool) -> R + Send,
126    R: Send,
127{
128    unsafe fn execute(this: *const ()) {
129        let this = unsafe { &*(this as *const Self) };
130        tlv::set(this.tlv);
131        let abort = unwind::AbortIfPanic;
132        let func = unsafe { (*this.func.get()).take().unwrap() };
133        unsafe {
134            (*this.result.get()) = JobResult::call(func);
135        }
136        unsafe {
137            Latch::set(&this.latch);
138        }
139        mem::forget(abort);
140    }
141}
142
143/// Represents a job stored in the heap. Used to implement
144/// `scope`. Unlike `StackJob`, when executed, `HeapJob` simply
145/// invokes a closure, which then triggers the appropriate logic to
146/// signal that the job executed.
147///
148/// (Probably `StackJob` should be refactored in a similar fashion.)
149pub(super) struct HeapJob<BODY>
150where
151    BODY: FnOnce(JobRefId) + Send,
152{
153    job: BODY,
154    tlv: Tlv,
155}
156
157impl<BODY> HeapJob<BODY>
158where
159    BODY: FnOnce(JobRefId) + Send,
160{
161    pub(super) fn new(tlv: Tlv, job: BODY) -> Box<Self> {
162        Box::new(HeapJob { job, tlv })
163    }
164
165    /// Creates a `JobRef` from this job -- note that this hides all
166    /// lifetimes, so it is up to you to ensure that this JobRef
167    /// doesn't outlive any data that it closes over.
168    pub(super) unsafe fn into_job_ref(self: Box<Self>) -> JobRef {
169        unsafe { JobRef::new(Box::into_raw(self)) }
170    }
171
172    /// Creates a static `JobRef` from this job.
173    pub(super) fn into_static_job_ref(self: Box<Self>) -> JobRef
174    where
175        BODY: 'static,
176    {
177        unsafe { self.into_job_ref() }
178    }
179}
180
181impl<BODY> Job for HeapJob<BODY>
182where
183    BODY: FnOnce(JobRefId) + Send,
184{
185    unsafe fn execute(this: *const ()) {
186        let pointer = this.expose_provenance();
187        let this = unsafe { Box::from_raw(this as *mut Self) };
188        tlv::set(this.tlv);
189        (this.job)(JobRefId { pointer });
190    }
191}
192
193/// Represents a job stored in an `Arc` -- like `HeapJob`, but may
194/// be turned into multiple `JobRef`s and called multiple times.
195pub(super) struct ArcJob<BODY>
196where
197    BODY: Fn(JobRefId) + Send + Sync,
198{
199    job: BODY,
200}
201
202impl<BODY> ArcJob<BODY>
203where
204    BODY: Fn(JobRefId) + Send + Sync,
205{
206    pub(super) fn new(job: BODY) -> Arc<Self> {
207        Arc::new(ArcJob { job })
208    }
209
210    /// Creates a `JobRef` from this job -- note that this hides all
211    /// lifetimes, so it is up to you to ensure that this JobRef
212    /// doesn't outlive any data that it closes over.
213    pub(super) unsafe fn as_job_ref(this: &Arc<Self>) -> JobRef {
214        unsafe { JobRef::new(Arc::into_raw(Arc::clone(this))) }
215    }
216
217    /// Creates a static `JobRef` from this job.
218    pub(super) fn as_static_job_ref(this: &Arc<Self>) -> JobRef
219    where
220        BODY: 'static,
221    {
222        unsafe { Self::as_job_ref(this) }
223    }
224}
225
226impl<BODY> Job for ArcJob<BODY>
227where
228    BODY: Fn(JobRefId) + Send + Sync,
229{
230    unsafe fn execute(this: *const ()) {
231        let pointer = this.expose_provenance();
232        let this = unsafe { Arc::from_raw(this as *mut Self) };
233        (this.job)(JobRefId { pointer });
234    }
235}
236
237impl<T> JobResult<T> {
238    fn call(func: impl FnOnce(bool) -> T) -> Self {
239        match unwind::halt_unwinding(|| func(true)) {
240            Ok(x) => JobResult::Ok(x),
241            Err(x) => JobResult::Panic(x),
242        }
243    }
244
245    /// Convert the `JobResult` for a job that has finished (and hence
246    /// its JobResult is populated) into its return value.
247    ///
248    /// NB. This will panic if the job panicked.
249    pub(super) fn into_return_value(self) -> T {
250        match self {
251            JobResult::None => unreachable!(),
252            JobResult::Ok(x) => x,
253            JobResult::Panic(x) => unwind::resume_unwinding(x),
254        }
255    }
256}
257
258/// Indirect queue to provide FIFO job priority.
259pub(super) struct JobFifo {
260    inner: Injector<JobRef>,
261}
262
263impl JobFifo {
264    pub(super) fn new() -> Self {
265        JobFifo { inner: Injector::new() }
266    }
267
268    pub(super) unsafe fn push(&self, job_ref: JobRef) -> JobRef {
269        // A little indirection ensures that spawns are always prioritized in FIFO order. The
270        // jobs in a thread's deque may be popped from the back (LIFO) or stolen from the front
271        // (FIFO), but either way they will end up popping from the front of this queue.
272        self.inner.push(job_ref);
273        unsafe { JobRef::new(self) }
274    }
275}
276
277impl Job for JobFifo {
278    unsafe fn execute(this: *const ()) {
279        // We "execute" a queue by executing its first job, FIFO.
280        let this = unsafe { &*(this as *const Self) };
281        loop {
282            match this.inner.steal() {
283                Steal::Success(job_ref) => break unsafe { job_ref.execute() },
284                Steal::Empty => panic!("FIFO is empty"),
285                Steal::Retry => {}
286            }
287        }
288    }
289}