1use std::any::Any;
2use std::cell::UnsafeCell;
3use std::mem;
4use std::sync::Arc;
5
6use crossbeam_deque::{Injector, Steal};
7
8use crate::latch::Latch;
9use crate::tlv::Tlv;
10use crate::{tlv, unwind};
11
12pub(super) enum JobResult<T> {
13 None,
14 Ok(T),
15 Panic(Box<dyn Any + Send>),
16}
17
18pub(super) trait Job {
24 unsafe fn execute(this: *const ());
28}
29
30#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
31pub(super) struct JobRefId {
32 pointer: usize,
33}
34
35pub(super) struct JobRef {
42 pointer: *const (),
43 execute_fn: unsafe fn(*const ()),
44}
45
46unsafe impl Send for JobRef {}
47unsafe impl Sync for JobRef {}
48
49impl JobRef {
50 pub(super) unsafe fn new<T>(data: *const T) -> JobRef
53 where
54 T: Job,
55 {
56 JobRef { pointer: data as *const (), execute_fn: <T as Job>::execute }
58 }
59
60 #[inline]
61 pub(super) fn id(&self) -> JobRefId {
62 JobRefId { pointer: self.pointer.expose_provenance() }
63 }
64
65 #[inline]
66 pub(super) unsafe fn execute(self) {
67 unsafe { (self.execute_fn)(self.pointer) }
68 }
69}
70
71pub(super) struct StackJob<L, F, R>
76where
77 L: Latch + Sync,
78 F: FnOnce(bool) -> R + Send,
79 R: Send,
80{
81 pub(super) latch: L,
82 func: UnsafeCell<Option<F>>,
83 result: UnsafeCell<JobResult<R>>,
84 tlv: Tlv,
85}
86
87impl<L, F, R> StackJob<L, F, R>
88where
89 L: Latch + Sync,
90 F: FnOnce(bool) -> R + Send,
91 R: Send,
92{
93 pub(super) fn new(tlv: Tlv, func: F, latch: L) -> StackJob<L, F, R> {
94 StackJob {
95 latch,
96 func: UnsafeCell::new(Some(func)),
97 result: UnsafeCell::new(JobResult::None),
98 tlv,
99 }
100 }
101
102 pub(super) unsafe fn as_job_ref(&self) -> JobRef {
103 unsafe { JobRef::new(self) }
104 }
105
106 pub(super) unsafe fn run_inline(&self, stolen: bool) {
107 unsafe {
108 let func = (*self.func.get()).take().unwrap();
109 *(self.result.get()) = match unwind::halt_unwinding(|| func(stolen)) {
110 Ok(x) => JobResult::Ok(x),
111 Err(x) => JobResult::Panic(x),
112 };
113 Latch::set(&self.latch);
114 }
115 }
116
117 pub(super) unsafe fn into_result(self) -> R {
118 self.result.into_inner().into_return_value()
119 }
120}
121
122impl<L, F, R> Job for StackJob<L, F, R>
123where
124 L: Latch + Sync,
125 F: FnOnce(bool) -> R + Send,
126 R: Send,
127{
128 unsafe fn execute(this: *const ()) {
129 let this = unsafe { &*(this as *const Self) };
130 tlv::set(this.tlv);
131 let abort = unwind::AbortIfPanic;
132 let func = unsafe { (*this.func.get()).take().unwrap() };
133 unsafe {
134 (*this.result.get()) = JobResult::call(func);
135 }
136 unsafe {
137 Latch::set(&this.latch);
138 }
139 mem::forget(abort);
140 }
141}
142
143pub(super) struct HeapJob<BODY>
150where
151 BODY: FnOnce(JobRefId) + Send,
152{
153 job: BODY,
154 tlv: Tlv,
155}
156
157impl<BODY> HeapJob<BODY>
158where
159 BODY: FnOnce(JobRefId) + Send,
160{
161 pub(super) fn new(tlv: Tlv, job: BODY) -> Box<Self> {
162 Box::new(HeapJob { job, tlv })
163 }
164
165 pub(super) unsafe fn into_job_ref(self: Box<Self>) -> JobRef {
169 unsafe { JobRef::new(Box::into_raw(self)) }
170 }
171
172 pub(super) fn into_static_job_ref(self: Box<Self>) -> JobRef
174 where
175 BODY: 'static,
176 {
177 unsafe { self.into_job_ref() }
178 }
179}
180
181impl<BODY> Job for HeapJob<BODY>
182where
183 BODY: FnOnce(JobRefId) + Send,
184{
185 unsafe fn execute(this: *const ()) {
186 let pointer = this.expose_provenance();
187 let this = unsafe { Box::from_raw(this as *mut Self) };
188 tlv::set(this.tlv);
189 (this.job)(JobRefId { pointer });
190 }
191}
192
193pub(super) struct ArcJob<BODY>
196where
197 BODY: Fn(JobRefId) + Send + Sync,
198{
199 job: BODY,
200}
201
202impl<BODY> ArcJob<BODY>
203where
204 BODY: Fn(JobRefId) + Send + Sync,
205{
206 pub(super) fn new(job: BODY) -> Arc<Self> {
207 Arc::new(ArcJob { job })
208 }
209
210 pub(super) unsafe fn as_job_ref(this: &Arc<Self>) -> JobRef {
214 unsafe { JobRef::new(Arc::into_raw(Arc::clone(this))) }
215 }
216
217 pub(super) fn as_static_job_ref(this: &Arc<Self>) -> JobRef
219 where
220 BODY: 'static,
221 {
222 unsafe { Self::as_job_ref(this) }
223 }
224}
225
226impl<BODY> Job for ArcJob<BODY>
227where
228 BODY: Fn(JobRefId) + Send + Sync,
229{
230 unsafe fn execute(this: *const ()) {
231 let pointer = this.expose_provenance();
232 let this = unsafe { Arc::from_raw(this as *mut Self) };
233 (this.job)(JobRefId { pointer });
234 }
235}
236
237impl<T> JobResult<T> {
238 fn call(func: impl FnOnce(bool) -> T) -> Self {
239 match unwind::halt_unwinding(|| func(true)) {
240 Ok(x) => JobResult::Ok(x),
241 Err(x) => JobResult::Panic(x),
242 }
243 }
244
245 pub(super) fn into_return_value(self) -> T {
250 match self {
251 JobResult::None => unreachable!(),
252 JobResult::Ok(x) => x,
253 JobResult::Panic(x) => unwind::resume_unwinding(x),
254 }
255 }
256}
257
258pub(super) struct JobFifo {
260 inner: Injector<JobRef>,
261}
262
263impl JobFifo {
264 pub(super) fn new() -> Self {
265 JobFifo { inner: Injector::new() }
266 }
267
268 pub(super) unsafe fn push(&self, job_ref: JobRef) -> JobRef {
269 self.inner.push(job_ref);
273 unsafe { JobRef::new(self) }
274 }
275}
276
277impl Job for JobFifo {
278 unsafe fn execute(this: *const ()) {
279 let this = unsafe { &*(this as *const Self) };
281 loop {
282 match this.inner.steal() {
283 Steal::Success(job_ref) => break unsafe { job_ref.execute() },
284 Steal::Empty => panic!("FIFO is empty"),
285 Steal::Retry => {}
286 }
287 }
288 }
289}