1use std::mem;
4use std::sync::atomic::Ordering::Relaxed;
5use std::task::Poll;
6use std::time::{Duration, SystemTime};
7
8use either::Either;
9use rand::seq::IteratorRandom;
10use rustc_abi::ExternAbi;
11use rustc_const_eval::CTRL_C_RECEIVED;
12use rustc_data_structures::fx::FxHashMap;
13use rustc_hir::def_id::DefId;
14use rustc_index::{Idx, IndexVec};
15use rustc_middle::mir::Mutability;
16use rustc_middle::ty::layout::TyAndLayout;
17use rustc_span::Span;
18
19use crate::concurrency::GlobalDataRaceHandler;
20use crate::shims::tls;
21use crate::*;
22
23#[derive(Clone, Copy, Debug, PartialEq)]
24enum SchedulingAction {
25 ExecuteStep,
27 ExecuteTimeoutCallback,
29 Sleep(Duration),
31}
32
33#[derive(Clone, Copy, Debug, PartialEq)]
35pub enum TlsAllocAction {
36 Deallocate,
38 Leak,
41}
42
43#[derive(Clone, Copy, Debug, PartialEq)]
45pub enum UnblockKind {
46 Ready,
48 TimedOut,
50}
51
52pub type DynUnblockCallback<'tcx> = DynMachineCallback<'tcx, UnblockKind>;
55
56#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
58pub struct ThreadId(u32);
59
60impl ThreadId {
61 pub fn to_u32(self) -> u32 {
62 self.0
63 }
64
65 pub fn new_unchecked(id: u32) -> Self {
67 Self(id)
68 }
69
70 pub const MAIN_THREAD: ThreadId = ThreadId(0);
71}
72
73impl Idx for ThreadId {
74 fn new(idx: usize) -> Self {
75 ThreadId(u32::try_from(idx).unwrap())
76 }
77
78 fn index(self) -> usize {
79 usize::try_from(self.0).unwrap()
80 }
81}
82
83impl From<ThreadId> for u64 {
84 fn from(t: ThreadId) -> Self {
85 t.0.into()
86 }
87}
88
89#[derive(Debug, Copy, Clone, PartialEq, Eq)]
91pub enum BlockReason {
92 Join(ThreadId),
95 Sleep,
97 Mutex,
99 Condvar(CondvarId),
101 RwLock,
103 Futex,
105 InitOnce(InitOnceId),
107 Epoll,
109 Eventfd,
111 UnnamedSocket,
113}
114
115enum ThreadState<'tcx> {
117 Enabled,
119 Blocked { reason: BlockReason, timeout: Option<Timeout>, callback: DynUnblockCallback<'tcx> },
121 Terminated,
124}
125
126impl<'tcx> std::fmt::Debug for ThreadState<'tcx> {
127 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128 match self {
129 Self::Enabled => write!(f, "Enabled"),
130 Self::Blocked { reason, timeout, .. } =>
131 f.debug_struct("Blocked").field("reason", reason).field("timeout", timeout).finish(),
132 Self::Terminated => write!(f, "Terminated"),
133 }
134 }
135}
136
137impl<'tcx> ThreadState<'tcx> {
138 fn is_enabled(&self) -> bool {
139 matches!(self, ThreadState::Enabled)
140 }
141
142 fn is_terminated(&self) -> bool {
143 matches!(self, ThreadState::Terminated)
144 }
145
146 fn is_blocked_on(&self, reason: BlockReason) -> bool {
147 matches!(*self, ThreadState::Blocked { reason: actual_reason, .. } if actual_reason == reason)
148 }
149}
150
151#[derive(Debug, Copy, Clone, PartialEq, Eq)]
153enum ThreadJoinStatus {
154 Joinable,
156 Detached,
159 Joined,
161}
162
163pub struct Thread<'tcx> {
165 state: ThreadState<'tcx>,
166
167 thread_name: Option<Vec<u8>>,
169
170 stack: Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>>,
172
173 pub(crate) on_stack_empty: Option<StackEmptyCallback<'tcx>>,
178
179 top_user_relevant_frame: Option<usize>,
185
186 join_status: ThreadJoinStatus,
188
189 pub(crate) panic_payloads: Vec<ImmTy<'tcx>>,
198
199 pub(crate) last_error: Option<MPlaceTy<'tcx>>,
201}
202
203pub type StackEmptyCallback<'tcx> =
204 Box<dyn FnMut(&mut MiriInterpCx<'tcx>) -> InterpResult<'tcx, Poll<()>> + 'tcx>;
205
206impl<'tcx> Thread<'tcx> {
207 fn thread_name(&self) -> Option<&[u8]> {
209 self.thread_name.as_deref()
210 }
211
212 fn thread_display_name(&self, id: ThreadId) -> String {
214 if let Some(ref thread_name) = self.thread_name {
215 String::from_utf8_lossy(thread_name).into_owned()
216 } else {
217 format!("unnamed-{}", id.index())
218 }
219 }
220
221 fn compute_top_user_relevant_frame(&self, skip: usize) -> Option<usize> {
227 self.stack
228 .iter()
229 .enumerate()
230 .rev()
231 .skip(skip)
232 .find_map(|(idx, frame)| if frame.extra.is_user_relevant { Some(idx) } else { None })
233 }
234
235 pub fn recompute_top_user_relevant_frame(&mut self, skip: usize) {
238 self.top_user_relevant_frame = self.compute_top_user_relevant_frame(skip);
239 }
240
241 pub fn set_top_user_relevant_frame(&mut self, frame_idx: usize) {
244 debug_assert_eq!(Some(frame_idx), self.compute_top_user_relevant_frame(0));
245 self.top_user_relevant_frame = Some(frame_idx);
246 }
247
248 pub fn top_user_relevant_frame(&self) -> Option<usize> {
251 debug_assert_eq!(self.top_user_relevant_frame, self.compute_top_user_relevant_frame(0));
252 self.top_user_relevant_frame.or_else(|| self.stack.len().checked_sub(1))
256 }
257
258 pub fn current_span(&self) -> Span {
259 self.top_user_relevant_frame()
260 .map(|frame_idx| self.stack[frame_idx].current_span())
261 .unwrap_or(rustc_span::DUMMY_SP)
262 }
263}
264
265impl<'tcx> std::fmt::Debug for Thread<'tcx> {
266 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267 write!(
268 f,
269 "{}({:?}, {:?})",
270 String::from_utf8_lossy(self.thread_name().unwrap_or(b"<unnamed>")),
271 self.state,
272 self.join_status
273 )
274 }
275}
276
277impl<'tcx> Thread<'tcx> {
278 fn new(name: Option<&str>, on_stack_empty: Option<StackEmptyCallback<'tcx>>) -> Self {
279 Self {
280 state: ThreadState::Enabled,
281 thread_name: name.map(|name| Vec::from(name.as_bytes())),
282 stack: Vec::new(),
283 top_user_relevant_frame: None,
284 join_status: ThreadJoinStatus::Joinable,
285 panic_payloads: Vec::new(),
286 last_error: None,
287 on_stack_empty,
288 }
289 }
290}
291
292impl VisitProvenance for Thread<'_> {
293 fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
294 let Thread {
295 panic_payloads: panic_payload,
296 last_error,
297 stack,
298 top_user_relevant_frame: _,
299 state: _,
300 thread_name: _,
301 join_status: _,
302 on_stack_empty: _, } = self;
304
305 for payload in panic_payload {
306 payload.visit_provenance(visit);
307 }
308 last_error.visit_provenance(visit);
309 for frame in stack {
310 frame.visit_provenance(visit)
311 }
312 }
313}
314
315impl VisitProvenance for Frame<'_, Provenance, FrameExtra<'_>> {
316 fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
317 let Frame {
318 return_place,
319 locals,
320 extra,
321 ..
323 } = self;
324
325 return_place.visit_provenance(visit);
327 for local in locals.iter() {
329 match local.as_mplace_or_imm() {
330 None => {}
331 Some(Either::Left((ptr, meta))) => {
332 ptr.visit_provenance(visit);
333 meta.visit_provenance(visit);
334 }
335 Some(Either::Right(imm)) => {
336 imm.visit_provenance(visit);
337 }
338 }
339 }
340
341 extra.visit_provenance(visit);
342 }
343}
344
345#[derive(Debug)]
347enum Timeout {
348 Monotonic(Instant),
349 RealTime(SystemTime),
350}
351
352impl Timeout {
353 fn get_wait_time(&self, clock: &MonotonicClock) -> Duration {
355 match self {
356 Timeout::Monotonic(instant) => instant.duration_since(clock.now()),
357 Timeout::RealTime(time) =>
358 time.duration_since(SystemTime::now()).unwrap_or(Duration::ZERO),
359 }
360 }
361
362 fn add_lossy(&self, duration: Duration) -> Self {
364 match self {
365 Timeout::Monotonic(i) => Timeout::Monotonic(i.add_lossy(duration)),
366 Timeout::RealTime(s) => {
367 Timeout::RealTime(
369 s.checked_add(duration)
370 .unwrap_or_else(|| s.checked_add(Duration::from_secs(3600)).unwrap()),
371 )
372 }
373 }
374 }
375}
376
377#[derive(Debug, Copy, Clone)]
379pub enum TimeoutClock {
380 Monotonic,
381 RealTime,
382}
383
384#[derive(Debug, Copy, Clone)]
386pub enum TimeoutAnchor {
387 Relative,
388 Absolute,
389}
390
391#[derive(Debug, Copy, Clone)]
393pub struct ThreadNotFound;
394
395#[derive(Debug)]
397pub struct ThreadManager<'tcx> {
398 active_thread: ThreadId,
400 threads: IndexVec<ThreadId, Thread<'tcx>>,
404 thread_local_allocs: FxHashMap<(DefId, ThreadId), StrictPointer>,
406 yield_active_thread: bool,
408 fixed_scheduling: bool,
410}
411
412impl VisitProvenance for ThreadManager<'_> {
413 fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
414 let ThreadManager {
415 threads,
416 thread_local_allocs,
417 active_thread: _,
418 yield_active_thread: _,
419 fixed_scheduling: _,
420 } = self;
421
422 for thread in threads {
423 thread.visit_provenance(visit);
424 }
425 for ptr in thread_local_allocs.values() {
426 ptr.visit_provenance(visit);
427 }
428 }
429}
430
431impl<'tcx> ThreadManager<'tcx> {
432 pub(crate) fn new(config: &MiriConfig) -> Self {
433 let mut threads = IndexVec::new();
434 threads.push(Thread::new(Some("main"), None));
436 Self {
437 active_thread: ThreadId::MAIN_THREAD,
438 threads,
439 thread_local_allocs: Default::default(),
440 yield_active_thread: false,
441 fixed_scheduling: config.fixed_scheduling,
442 }
443 }
444
445 pub(crate) fn init(
446 ecx: &mut MiriInterpCx<'tcx>,
447 on_main_stack_empty: StackEmptyCallback<'tcx>,
448 ) {
449 ecx.machine.threads.threads[ThreadId::MAIN_THREAD].on_stack_empty =
450 Some(on_main_stack_empty);
451 if ecx.tcx.sess.target.os.as_ref() != "windows" {
452 ecx.machine.threads.threads[ThreadId::MAIN_THREAD].join_status =
454 ThreadJoinStatus::Detached;
455 }
456 }
457
458 pub fn thread_id_try_from(&self, id: impl TryInto<u32>) -> Result<ThreadId, ThreadNotFound> {
459 if let Ok(id) = id.try_into()
460 && usize::try_from(id).is_ok_and(|id| id < self.threads.len())
461 {
462 Ok(ThreadId(id))
463 } else {
464 Err(ThreadNotFound)
465 }
466 }
467
468 fn get_thread_local_alloc_id(&self, def_id: DefId) -> Option<StrictPointer> {
471 self.thread_local_allocs.get(&(def_id, self.active_thread)).cloned()
472 }
473
474 fn set_thread_local_alloc(&mut self, def_id: DefId, ptr: StrictPointer) {
479 self.thread_local_allocs.try_insert((def_id, self.active_thread), ptr).unwrap();
480 }
481
482 pub fn active_thread_stack(&self) -> &[Frame<'tcx, Provenance, FrameExtra<'tcx>>] {
484 &self.threads[self.active_thread].stack
485 }
486
487 pub fn active_thread_stack_mut(
489 &mut self,
490 ) -> &mut Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>> {
491 &mut self.threads[self.active_thread].stack
492 }
493
494 pub fn all_stacks(
495 &self,
496 ) -> impl Iterator<Item = (ThreadId, &[Frame<'tcx, Provenance, FrameExtra<'tcx>>])> {
497 self.threads.iter_enumerated().map(|(id, t)| (id, &t.stack[..]))
498 }
499
500 fn create_thread(&mut self, on_stack_empty: StackEmptyCallback<'tcx>) -> ThreadId {
502 let new_thread_id = ThreadId::new(self.threads.len());
503 self.threads.push(Thread::new(None, Some(on_stack_empty)));
504 new_thread_id
505 }
506
507 fn set_active_thread_id(&mut self, id: ThreadId) -> ThreadId {
509 assert!(id.index() < self.threads.len());
510 info!(
511 "---------- Now executing on thread `{}` (previous: `{}`) ----------------------------------------",
512 self.get_thread_display_name(id),
513 self.get_thread_display_name(self.active_thread)
514 );
515 std::mem::replace(&mut self.active_thread, id)
516 }
517
518 pub fn active_thread(&self) -> ThreadId {
520 self.active_thread
521 }
522
523 pub fn get_total_thread_count(&self) -> usize {
525 self.threads.len()
526 }
527
528 pub fn get_live_thread_count(&self) -> usize {
531 self.threads.iter().filter(|t| !t.state.is_terminated()).count()
532 }
533
534 fn has_terminated(&self, thread_id: ThreadId) -> bool {
536 self.threads[thread_id].state.is_terminated()
537 }
538
539 fn have_all_terminated(&self) -> bool {
541 self.threads.iter().all(|thread| thread.state.is_terminated())
542 }
543
544 fn enable_thread(&mut self, thread_id: ThreadId) {
546 assert!(self.has_terminated(thread_id));
547 self.threads[thread_id].state = ThreadState::Enabled;
548 }
549
550 pub fn active_thread_mut(&mut self) -> &mut Thread<'tcx> {
552 &mut self.threads[self.active_thread]
553 }
554
555 pub fn active_thread_ref(&self) -> &Thread<'tcx> {
557 &self.threads[self.active_thread]
558 }
559
560 fn detach_thread(&mut self, id: ThreadId, allow_terminated_joined: bool) -> InterpResult<'tcx> {
569 trace!("detaching {:?}", id);
570
571 let is_ub = if allow_terminated_joined && self.threads[id].state.is_terminated() {
572 self.threads[id].join_status == ThreadJoinStatus::Detached
574 } else {
575 self.threads[id].join_status != ThreadJoinStatus::Joinable
576 };
577 if is_ub {
578 throw_ub_format!("trying to detach thread that was already detached or joined");
579 }
580
581 self.threads[id].join_status = ThreadJoinStatus::Detached;
582 interp_ok(())
583 }
584
585 pub fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>) {
587 self.threads[thread].thread_name = Some(new_thread_name);
588 }
589
590 pub fn get_thread_name(&self, thread: ThreadId) -> Option<&[u8]> {
592 self.threads[thread].thread_name()
593 }
594
595 pub fn get_thread_display_name(&self, thread: ThreadId) -> String {
596 self.threads[thread].thread_display_name(thread)
597 }
598
599 fn block_thread(
601 &mut self,
602 reason: BlockReason,
603 timeout: Option<Timeout>,
604 callback: DynUnblockCallback<'tcx>,
605 ) {
606 let state = &mut self.threads[self.active_thread].state;
607 assert!(state.is_enabled());
608 *state = ThreadState::Blocked { reason, timeout, callback }
609 }
610
611 fn yield_active_thread(&mut self) {
613 self.yield_active_thread = true;
617 }
618
619 fn next_callback_wait_time(&self, clock: &MonotonicClock) -> Option<Duration> {
621 self.threads
622 .iter()
623 .filter_map(|t| {
624 match &t.state {
625 ThreadState::Blocked { timeout: Some(timeout), .. } =>
626 Some(timeout.get_wait_time(clock)),
627 _ => None,
628 }
629 })
630 .min()
631 }
632}
633
634impl<'tcx> EvalContextPrivExt<'tcx> for MiriInterpCx<'tcx> {}
635trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> {
636 #[inline]
638 fn run_timeout_callback(&mut self) -> InterpResult<'tcx> {
639 let this = self.eval_context_mut();
640 let mut found_callback = None;
641 for (id, thread) in this.machine.threads.threads.iter_enumerated_mut() {
643 match &thread.state {
644 ThreadState::Blocked { timeout: Some(timeout), .. }
645 if timeout.get_wait_time(&this.machine.monotonic_clock) == Duration::ZERO =>
646 {
647 let old_state = mem::replace(&mut thread.state, ThreadState::Enabled);
648 let ThreadState::Blocked { callback, .. } = old_state else { unreachable!() };
649 found_callback = Some((id, callback));
650 break;
652 }
653 _ => {}
654 }
655 }
656 if let Some((thread, callback)) = found_callback {
657 let old_thread = this.machine.threads.set_active_thread_id(thread);
664 callback.call(this, UnblockKind::TimedOut)?;
665 this.machine.threads.set_active_thread_id(old_thread);
666 }
667 interp_ok(())
674 }
675
676 #[inline]
677 fn run_on_stack_empty(&mut self) -> InterpResult<'tcx, Poll<()>> {
678 let this = self.eval_context_mut();
679 if let Some(genmc_ctx) = this.machine.data_race.as_genmc_ref() {
681 let thread_id = this.active_thread();
682 genmc_ctx.handle_thread_stack_empty(thread_id);
683 }
684 let mut callback = this
685 .active_thread_mut()
686 .on_stack_empty
687 .take()
688 .expect("`on_stack_empty` not set up, or already running");
689 let res = callback(this)?;
690 this.active_thread_mut().on_stack_empty = Some(callback);
691 interp_ok(res)
692 }
693
694 fn schedule(&mut self) -> InterpResult<'tcx, SchedulingAction> {
703 let this = self.eval_context_mut();
704 if let Some(genmc_ctx) = this.machine.data_race.as_genmc_ref() {
706 let next_thread_id = genmc_ctx.schedule_thread(this)?;
707
708 let thread_manager = &mut this.machine.threads;
709 thread_manager.active_thread = next_thread_id;
710 thread_manager.yield_active_thread = false;
711
712 assert!(thread_manager.threads[thread_manager.active_thread].state.is_enabled());
713 return interp_ok(SchedulingAction::ExecuteStep);
714 }
715
716 let thread_manager = &mut this.machine.threads;
718 let clock = &this.machine.monotonic_clock;
719 let rng = this.machine.rng.get_mut();
720 if thread_manager.threads[thread_manager.active_thread].state.is_enabled()
722 && !thread_manager.yield_active_thread
723 {
724 return interp_ok(SchedulingAction::ExecuteStep);
726 }
727 let potential_sleep_time = thread_manager.next_callback_wait_time(clock);
734 if potential_sleep_time == Some(Duration::ZERO) {
735 return interp_ok(SchedulingAction::ExecuteTimeoutCallback);
736 }
737 let mut threads_iter = thread_manager
744 .threads
745 .iter_enumerated()
746 .skip(thread_manager.active_thread.index() + 1)
747 .chain(
748 thread_manager
749 .threads
750 .iter_enumerated()
751 .take(thread_manager.active_thread.index() + 1),
752 )
753 .filter(|(_id, thread)| thread.state.is_enabled());
754 let new_thread = if thread_manager.fixed_scheduling {
756 threads_iter.next()
757 } else {
758 threads_iter.choose(rng)
759 };
760
761 if let Some((id, _thread)) = new_thread {
762 if thread_manager.active_thread != id {
763 info!(
764 "---------- Now executing on thread `{}` (previous: `{}`) ----------------------------------------",
765 thread_manager.get_thread_display_name(id),
766 thread_manager.get_thread_display_name(thread_manager.active_thread)
767 );
768 thread_manager.active_thread = id;
769 }
770 }
771 thread_manager.yield_active_thread = false;
773
774 if thread_manager.threads[thread_manager.active_thread].state.is_enabled() {
775 return interp_ok(SchedulingAction::ExecuteStep);
776 }
777 if thread_manager.threads.iter().all(|thread| thread.state.is_terminated()) {
779 unreachable!("all threads terminated without the main thread terminating?!");
780 } else if let Some(sleep_time) = potential_sleep_time {
781 interp_ok(SchedulingAction::Sleep(sleep_time))
785 } else {
786 throw_machine_stop!(TerminationInfo::Deadlock);
787 }
788 }
789}
790
791impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {}
793pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
794 #[inline]
795 fn thread_id_try_from(&self, id: impl TryInto<u32>) -> Result<ThreadId, ThreadNotFound> {
796 self.eval_context_ref().machine.threads.thread_id_try_from(id)
797 }
798
799 fn get_or_create_thread_local_alloc(
802 &mut self,
803 def_id: DefId,
804 ) -> InterpResult<'tcx, StrictPointer> {
805 let this = self.eval_context_mut();
806 let tcx = this.tcx;
807 if let Some(old_alloc) = this.machine.threads.get_thread_local_alloc_id(def_id) {
808 interp_ok(old_alloc)
811 } else {
812 if tcx.is_foreign_item(def_id) {
816 throw_unsup_format!("foreign thread-local statics are not supported");
817 }
818 let params = this.machine.get_default_alloc_params();
819 let alloc = this.ctfe_query(|tcx| tcx.eval_static_initializer(def_id))?;
820 let mut alloc = alloc.inner().adjust_from_tcx(
822 &this.tcx,
823 |bytes, align| {
824 interp_ok(MiriAllocBytes::from_bytes(
825 std::borrow::Cow::Borrowed(bytes),
826 align,
827 params,
828 ))
829 },
830 |ptr| this.global_root_pointer(ptr),
831 )?;
832 alloc.mutability = Mutability::Mut;
834 let ptr = this.insert_allocation(alloc, MiriMemoryKind::Tls.into())?;
836 this.machine.threads.set_thread_local_alloc(def_id, ptr);
837 interp_ok(ptr)
838 }
839 }
840
841 #[inline]
843 fn start_regular_thread(
844 &mut self,
845 thread: Option<MPlaceTy<'tcx>>,
846 start_routine: Pointer,
847 start_abi: ExternAbi,
848 func_arg: ImmTy<'tcx>,
849 ret_layout: TyAndLayout<'tcx>,
850 ) -> InterpResult<'tcx, ThreadId> {
851 let this = self.eval_context_mut();
852
853 let new_thread_id = this.machine.threads.create_thread({
855 let mut state = tls::TlsDtorsState::default();
856 Box::new(move |m| state.on_stack_empty(m))
857 });
858 let current_span = this.machine.current_span();
859 match &mut this.machine.data_race {
860 GlobalDataRaceHandler::None => {}
861 GlobalDataRaceHandler::Vclocks(data_race) =>
862 data_race.thread_created(&this.machine.threads, new_thread_id, current_span),
863 GlobalDataRaceHandler::Genmc(genmc_ctx) =>
864 genmc_ctx.handle_thread_create(&this.machine.threads, new_thread_id)?,
865 }
866 if let Some(thread_info_place) = thread {
869 this.write_scalar(
870 Scalar::from_uint(new_thread_id.to_u32(), thread_info_place.layout.size),
871 &thread_info_place,
872 )?;
873 }
874
875 let old_thread_id = this.machine.threads.set_active_thread_id(new_thread_id);
878
879 if let Some(cpuset) = this.machine.thread_cpu_affinity.get(&old_thread_id).cloned() {
881 this.machine.thread_cpu_affinity.insert(new_thread_id, cpuset);
882 }
883
884 let instance = this.get_ptr_fn(start_routine)?.as_instance()?;
886
887 let ret_place = this.allocate(ret_layout, MiriMemoryKind::Machine.into())?;
891
892 this.call_function(
893 instance,
894 start_abi,
895 &[func_arg],
896 Some(&ret_place),
897 StackPopCleanup::Root { cleanup: true },
898 )?;
899
900 this.machine.threads.set_active_thread_id(old_thread_id);
902
903 interp_ok(new_thread_id)
904 }
905
906 fn terminate_active_thread(&mut self, tls_alloc_action: TlsAllocAction) -> InterpResult<'tcx> {
911 let this = self.eval_context_mut();
912
913 let thread = this.active_thread_mut();
915 assert!(thread.stack.is_empty(), "only threads with an empty stack can be terminated");
916 thread.state = ThreadState::Terminated;
917 match &mut this.machine.data_race {
918 GlobalDataRaceHandler::None => {}
919 GlobalDataRaceHandler::Vclocks(data_race) =>
920 data_race.thread_terminated(&this.machine.threads),
921 GlobalDataRaceHandler::Genmc(genmc_ctx) =>
922 genmc_ctx.handle_thread_finish(&this.machine.threads)?,
923 }
924 let gone_thread = this.active_thread();
926 {
927 let mut free_tls_statics = Vec::new();
928 this.machine.threads.thread_local_allocs.retain(|&(_def_id, thread), &mut alloc_id| {
929 if thread != gone_thread {
930 return true;
932 }
933 free_tls_statics.push(alloc_id);
936 false
937 });
938 for ptr in free_tls_statics {
940 match tls_alloc_action {
941 TlsAllocAction::Deallocate =>
942 this.deallocate_ptr(ptr.into(), None, MiriMemoryKind::Tls.into())?,
943 TlsAllocAction::Leak =>
944 if let Some(alloc) = ptr.provenance.get_alloc_id() {
945 trace!(
946 "Thread-local static leaked and stored as static root: {:?}",
947 alloc
948 );
949 this.machine.static_roots.push(alloc);
950 },
951 }
952 }
953 }
954 let unblock_reason = BlockReason::Join(gone_thread);
956 let threads = &this.machine.threads.threads;
957 let joining_threads = threads
958 .iter_enumerated()
959 .filter(|(_, thread)| thread.state.is_blocked_on(unblock_reason))
960 .map(|(id, _)| id)
961 .collect::<Vec<_>>();
962 for thread in joining_threads {
963 this.unblock_thread(thread, unblock_reason)?;
964 }
965
966 interp_ok(())
967 }
968
969 #[inline]
972 fn block_thread(
973 &mut self,
974 reason: BlockReason,
975 timeout: Option<(TimeoutClock, TimeoutAnchor, Duration)>,
976 callback: DynUnblockCallback<'tcx>,
977 ) {
978 let this = self.eval_context_mut();
979 let timeout = timeout.map(|(clock, anchor, duration)| {
980 let anchor = match clock {
981 TimeoutClock::RealTime => {
982 assert!(
983 this.machine.communicate(),
984 "cannot have `RealTime` timeout with isolation enabled!"
985 );
986 Timeout::RealTime(match anchor {
987 TimeoutAnchor::Absolute => SystemTime::UNIX_EPOCH,
988 TimeoutAnchor::Relative => SystemTime::now(),
989 })
990 }
991 TimeoutClock::Monotonic =>
992 Timeout::Monotonic(match anchor {
993 TimeoutAnchor::Absolute => this.machine.monotonic_clock.epoch(),
994 TimeoutAnchor::Relative => this.machine.monotonic_clock.now(),
995 }),
996 };
997 anchor.add_lossy(duration)
998 });
999 this.machine.threads.block_thread(reason, timeout, callback);
1000 }
1001
1002 fn unblock_thread(&mut self, thread: ThreadId, reason: BlockReason) -> InterpResult<'tcx> {
1005 let this = self.eval_context_mut();
1006 let old_state =
1007 mem::replace(&mut this.machine.threads.threads[thread].state, ThreadState::Enabled);
1008 let callback = match old_state {
1009 ThreadState::Blocked { reason: actual_reason, callback, .. } => {
1010 assert_eq!(
1011 reason, actual_reason,
1012 "unblock_thread: thread was blocked for the wrong reason"
1013 );
1014 callback
1015 }
1016 _ => panic!("unblock_thread: thread was not blocked"),
1017 };
1018 let old_thread = this.machine.threads.set_active_thread_id(thread);
1020 callback.call(this, UnblockKind::Ready)?;
1021 this.machine.threads.set_active_thread_id(old_thread);
1022 interp_ok(())
1023 }
1024
1025 #[inline]
1026 fn detach_thread(
1027 &mut self,
1028 thread_id: ThreadId,
1029 allow_terminated_joined: bool,
1030 ) -> InterpResult<'tcx> {
1031 let this = self.eval_context_mut();
1032 this.machine.threads.detach_thread(thread_id, allow_terminated_joined)
1033 }
1034
1035 fn join_thread(
1039 &mut self,
1040 joined_thread_id: ThreadId,
1041 success_retval: Scalar,
1042 return_dest: &MPlaceTy<'tcx>,
1043 ) -> InterpResult<'tcx> {
1044 let this = self.eval_context_mut();
1045 let thread_mgr = &mut this.machine.threads;
1046 if thread_mgr.threads[joined_thread_id].join_status == ThreadJoinStatus::Detached {
1047 throw_ub_format!("trying to join a detached thread");
1049 }
1050
1051 fn after_join<'tcx>(
1052 this: &mut InterpCx<'tcx, MiriMachine<'tcx>>,
1053 joined_thread_id: ThreadId,
1054 success_retval: Scalar,
1055 return_dest: &MPlaceTy<'tcx>,
1056 ) -> InterpResult<'tcx> {
1057 let threads = &this.machine.threads;
1058 match &mut this.machine.data_race {
1059 GlobalDataRaceHandler::None => {}
1060 GlobalDataRaceHandler::Vclocks(data_race) =>
1061 data_race.thread_joined(threads, joined_thread_id),
1062 GlobalDataRaceHandler::Genmc(genmc_ctx) =>
1063 genmc_ctx.handle_thread_join(threads.active_thread, joined_thread_id)?,
1064 }
1065 this.write_scalar(success_retval, return_dest)?;
1066 interp_ok(())
1067 }
1068
1069 thread_mgr.threads[joined_thread_id].join_status = ThreadJoinStatus::Joined;
1072 if !thread_mgr.threads[joined_thread_id].state.is_terminated() {
1073 trace!(
1074 "{:?} blocked on {:?} when trying to join",
1075 thread_mgr.active_thread, joined_thread_id
1076 );
1077 let dest = return_dest.clone();
1080 thread_mgr.block_thread(
1081 BlockReason::Join(joined_thread_id),
1082 None,
1083 callback!(
1084 @capture<'tcx> {
1085 joined_thread_id: ThreadId,
1086 dest: MPlaceTy<'tcx>,
1087 success_retval: Scalar,
1088 }
1089 |this, unblock: UnblockKind| {
1090 assert_eq!(unblock, UnblockKind::Ready);
1091 after_join(this, joined_thread_id, success_retval, &dest)
1092 }
1093 ),
1094 );
1095 } else {
1096 after_join(this, joined_thread_id, success_retval, return_dest)?;
1098 }
1099 interp_ok(())
1100 }
1101
1102 fn join_thread_exclusive(
1107 &mut self,
1108 joined_thread_id: ThreadId,
1109 success_retval: Scalar,
1110 return_dest: &MPlaceTy<'tcx>,
1111 ) -> InterpResult<'tcx> {
1112 let this = self.eval_context_mut();
1113 let threads = &this.machine.threads.threads;
1114 if threads[joined_thread_id].join_status == ThreadJoinStatus::Joined {
1115 throw_ub_format!("trying to join an already joined thread");
1116 }
1117
1118 if joined_thread_id == this.machine.threads.active_thread {
1119 throw_ub_format!("trying to join itself");
1120 }
1121
1122 assert!(
1124 threads
1125 .iter()
1126 .all(|thread| { !thread.state.is_blocked_on(BlockReason::Join(joined_thread_id)) }),
1127 "this thread already has threads waiting for its termination"
1128 );
1129
1130 this.join_thread(joined_thread_id, success_retval, return_dest)
1131 }
1132
1133 #[inline]
1134 fn active_thread(&self) -> ThreadId {
1135 let this = self.eval_context_ref();
1136 this.machine.threads.active_thread()
1137 }
1138
1139 #[inline]
1140 fn active_thread_mut(&mut self) -> &mut Thread<'tcx> {
1141 let this = self.eval_context_mut();
1142 this.machine.threads.active_thread_mut()
1143 }
1144
1145 #[inline]
1146 fn active_thread_ref(&self) -> &Thread<'tcx> {
1147 let this = self.eval_context_ref();
1148 this.machine.threads.active_thread_ref()
1149 }
1150
1151 #[inline]
1152 fn get_total_thread_count(&self) -> usize {
1153 let this = self.eval_context_ref();
1154 this.machine.threads.get_total_thread_count()
1155 }
1156
1157 #[inline]
1158 fn have_all_terminated(&self) -> bool {
1159 let this = self.eval_context_ref();
1160 this.machine.threads.have_all_terminated()
1161 }
1162
1163 #[inline]
1164 fn enable_thread(&mut self, thread_id: ThreadId) {
1165 let this = self.eval_context_mut();
1166 this.machine.threads.enable_thread(thread_id);
1167 }
1168
1169 #[inline]
1170 fn active_thread_stack<'a>(&'a self) -> &'a [Frame<'tcx, Provenance, FrameExtra<'tcx>>] {
1171 let this = self.eval_context_ref();
1172 this.machine.threads.active_thread_stack()
1173 }
1174
1175 #[inline]
1176 fn active_thread_stack_mut<'a>(
1177 &'a mut self,
1178 ) -> &'a mut Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>> {
1179 let this = self.eval_context_mut();
1180 this.machine.threads.active_thread_stack_mut()
1181 }
1182
1183 #[inline]
1185 fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>) {
1186 self.eval_context_mut().machine.threads.set_thread_name(thread, new_thread_name);
1187 }
1188
1189 #[inline]
1190 fn get_thread_name<'c>(&'c self, thread: ThreadId) -> Option<&'c [u8]>
1191 where
1192 'tcx: 'c,
1193 {
1194 self.eval_context_ref().machine.threads.get_thread_name(thread)
1195 }
1196
1197 #[inline]
1198 fn yield_active_thread(&mut self) {
1199 self.eval_context_mut().machine.threads.yield_active_thread();
1200 }
1201
1202 #[inline]
1203 fn maybe_preempt_active_thread(&mut self) {
1204 use rand::Rng as _;
1205
1206 let this = self.eval_context_mut();
1207 if !this.machine.threads.fixed_scheduling
1208 && this.machine.rng.get_mut().random_bool(this.machine.preemption_rate)
1209 {
1210 this.yield_active_thread();
1211 }
1212 }
1213
1214 fn run_threads(&mut self) -> InterpResult<'tcx, !> {
1217 let this = self.eval_context_mut();
1218 loop {
1219 if CTRL_C_RECEIVED.load(Relaxed) {
1220 this.machine.handle_abnormal_termination();
1221 throw_machine_stop!(TerminationInfo::Interrupted);
1222 }
1223 match this.schedule()? {
1224 SchedulingAction::ExecuteStep => {
1225 if !this.step()? {
1226 match this.run_on_stack_empty()? {
1228 Poll::Pending => {} Poll::Ready(()) =>
1230 this.terminate_active_thread(TlsAllocAction::Deallocate)?,
1231 }
1232 }
1233 }
1234 SchedulingAction::ExecuteTimeoutCallback => {
1235 this.run_timeout_callback()?;
1236 }
1237 SchedulingAction::Sleep(duration) => {
1238 this.machine.monotonic_clock.sleep(duration);
1239 }
1240 }
1241 }
1242 }
1243}