1use std::any::Any;
2use std::cell::UnsafeCell;
3use std::mem;
4use std::sync::Arc;
5
6use crossbeam_deque::{Injector, Steal};
7
8use crate::latch::Latch;
9use crate::tlv::Tlv;
10use crate::{tlv, unwind};
11
12pub(super) enum JobResult<T> {
13 None,
14 Ok(T),
15 Panic(Box<dyn Any + Send>),
16}
17
18pub(super) trait Job {
24 unsafe fn execute(this: *const ());
28}
29
30pub(super) struct JobRef {
37 pointer: *const (),
38 execute_fn: unsafe fn(*const ()),
39}
40
41unsafe impl Send for JobRef {}
42unsafe impl Sync for JobRef {}
43
44impl JobRef {
45 pub(super) unsafe fn new<T>(data: *const T) -> JobRef
48 where
49 T: Job,
50 {
51 JobRef { pointer: data as *const (), execute_fn: <T as Job>::execute }
53 }
54
55 #[inline]
58 pub(super) fn id(&self) -> impl Eq {
59 (self.pointer, self.execute_fn)
60 }
61
62 #[inline]
63 pub(super) unsafe fn execute(self) {
64 unsafe { (self.execute_fn)(self.pointer) }
65 }
66}
67
68pub(super) struct StackJob<L, F, R>
73where
74 L: Latch + Sync,
75 F: FnOnce(bool) -> R + Send,
76 R: Send,
77{
78 pub(super) latch: L,
79 func: UnsafeCell<Option<F>>,
80 result: UnsafeCell<JobResult<R>>,
81 tlv: Tlv,
82}
83
84impl<L, F, R> StackJob<L, F, R>
85where
86 L: Latch + Sync,
87 F: FnOnce(bool) -> R + Send,
88 R: Send,
89{
90 pub(super) fn new(tlv: Tlv, func: F, latch: L) -> StackJob<L, F, R> {
91 StackJob {
92 latch,
93 func: UnsafeCell::new(Some(func)),
94 result: UnsafeCell::new(JobResult::None),
95 tlv,
96 }
97 }
98
99 pub(super) unsafe fn as_job_ref(&self) -> JobRef {
100 unsafe { JobRef::new(self) }
101 }
102
103 pub(super) unsafe fn run_inline(self, stolen: bool) -> R {
104 self.func.into_inner().unwrap()(stolen)
105 }
106
107 pub(super) unsafe fn into_result(self) -> R {
108 self.result.into_inner().into_return_value()
109 }
110}
111
112impl<L, F, R> Job for StackJob<L, F, R>
113where
114 L: Latch + Sync,
115 F: FnOnce(bool) -> R + Send,
116 R: Send,
117{
118 unsafe fn execute(this: *const ()) {
119 let this = unsafe { &*(this as *const Self) };
120 tlv::set(this.tlv);
121 let abort = unwind::AbortIfPanic;
122 let func = unsafe { (*this.func.get()).take().unwrap() };
123 unsafe {
124 (*this.result.get()) = JobResult::call(func);
125 }
126 unsafe {
127 Latch::set(&this.latch);
128 }
129 mem::forget(abort);
130 }
131}
132
133pub(super) struct HeapJob<BODY>
140where
141 BODY: FnOnce() + Send,
142{
143 job: BODY,
144 tlv: Tlv,
145}
146
147impl<BODY> HeapJob<BODY>
148where
149 BODY: FnOnce() + Send,
150{
151 pub(super) fn new(tlv: Tlv, job: BODY) -> Box<Self> {
152 Box::new(HeapJob { job, tlv })
153 }
154
155 pub(super) unsafe fn into_job_ref(self: Box<Self>) -> JobRef {
159 unsafe { JobRef::new(Box::into_raw(self)) }
160 }
161
162 pub(super) fn into_static_job_ref(self: Box<Self>) -> JobRef
164 where
165 BODY: 'static,
166 {
167 unsafe { self.into_job_ref() }
168 }
169}
170
171impl<BODY> Job for HeapJob<BODY>
172where
173 BODY: FnOnce() + Send,
174{
175 unsafe fn execute(this: *const ()) {
176 let this = unsafe { Box::from_raw(this as *mut Self) };
177 tlv::set(this.tlv);
178 (this.job)();
179 }
180}
181
182pub(super) struct ArcJob<BODY>
185where
186 BODY: Fn() + Send + Sync,
187{
188 job: BODY,
189}
190
191impl<BODY> ArcJob<BODY>
192where
193 BODY: Fn() + Send + Sync,
194{
195 pub(super) fn new(job: BODY) -> Arc<Self> {
196 Arc::new(ArcJob { job })
197 }
198
199 pub(super) unsafe fn as_job_ref(this: &Arc<Self>) -> JobRef {
203 unsafe { JobRef::new(Arc::into_raw(Arc::clone(this))) }
204 }
205
206 pub(super) fn as_static_job_ref(this: &Arc<Self>) -> JobRef
208 where
209 BODY: 'static,
210 {
211 unsafe { Self::as_job_ref(this) }
212 }
213}
214
215impl<BODY> Job for ArcJob<BODY>
216where
217 BODY: Fn() + Send + Sync,
218{
219 unsafe fn execute(this: *const ()) {
220 let this = unsafe { Arc::from_raw(this as *mut Self) };
221 (this.job)();
222 }
223}
224
225impl<T> JobResult<T> {
226 fn call(func: impl FnOnce(bool) -> T) -> Self {
227 match unwind::halt_unwinding(|| func(true)) {
228 Ok(x) => JobResult::Ok(x),
229 Err(x) => JobResult::Panic(x),
230 }
231 }
232
233 pub(super) fn into_return_value(self) -> T {
238 match self {
239 JobResult::None => unreachable!(),
240 JobResult::Ok(x) => x,
241 JobResult::Panic(x) => unwind::resume_unwinding(x),
242 }
243 }
244}
245
246pub(super) struct JobFifo {
248 inner: Injector<JobRef>,
249}
250
251impl JobFifo {
252 pub(super) fn new() -> Self {
253 JobFifo { inner: Injector::new() }
254 }
255
256 pub(super) unsafe fn push(&self, job_ref: JobRef) -> JobRef {
257 self.inner.push(job_ref);
261 unsafe { JobRef::new(self) }
262 }
263}
264
265impl Job for JobFifo {
266 unsafe fn execute(this: *const ()) {
267 let this = unsafe { &*(this as *const Self) };
269 loop {
270 match this.inner.steal() {
271 Steal::Success(job_ref) => break unsafe { job_ref.execute() },
272 Steal::Empty => panic!("FIFO is empty"),
273 Steal::Retry => {}
274 }
275 }
276 }
277}