1use std::mem;
4use std::sync::atomic::Ordering::Relaxed;
5use std::task::Poll;
6use std::time::{Duration, SystemTime};
7
8use rand::seq::IteratorRandom;
9use rustc_abi::ExternAbi;
10use rustc_const_eval::CTRL_C_RECEIVED;
11use rustc_data_structures::either::Either;
12use rustc_data_structures::fx::FxHashMap;
13use rustc_hir::def_id::DefId;
14use rustc_index::{Idx, IndexVec};
15use rustc_middle::mir::Mutability;
16use rustc_middle::ty::layout::TyAndLayout;
17use rustc_span::Span;
18use rustc_target::spec::Os;
19
20use crate::concurrency::GlobalDataRaceHandler;
21use crate::shims::tls;
22use crate::*;
23
24#[derive(Clone, Copy, Debug, PartialEq)]
25enum SchedulingAction {
26 ExecuteStep,
28 ExecuteTimeoutCallback,
30 Sleep(Duration),
32}
33
34#[derive(Clone, Copy, Debug, PartialEq)]
36pub enum TlsAllocAction {
37 Deallocate,
39 Leak,
42}
43
44#[derive(Clone, Copy, Debug, PartialEq)]
46pub enum UnblockKind {
47 Ready,
49 TimedOut,
51}
52
53pub type DynUnblockCallback<'tcx> = DynMachineCallback<'tcx, UnblockKind>;
56
57#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
59pub struct ThreadId(u32);
60
61impl ThreadId {
62 pub fn to_u32(self) -> u32 {
63 self.0
64 }
65
66 pub fn new_unchecked(id: u32) -> Self {
68 Self(id)
69 }
70
71 pub const MAIN_THREAD: ThreadId = ThreadId(0);
72}
73
74impl Idx for ThreadId {
75 fn new(idx: usize) -> Self {
76 ThreadId(u32::try_from(idx).unwrap())
77 }
78
79 fn index(self) -> usize {
80 usize::try_from(self.0).unwrap()
81 }
82}
83
84impl From<ThreadId> for u64 {
85 fn from(t: ThreadId) -> Self {
86 t.0.into()
87 }
88}
89
90#[derive(Debug, Copy, Clone, PartialEq, Eq)]
92pub enum BlockReason {
93 Join(ThreadId),
96 Sleep,
98 Mutex,
100 Condvar,
102 RwLock,
104 Futex,
106 InitOnce,
108 Epoll,
110 Eventfd,
112 UnnamedSocket,
114 Genmc,
117}
118
119enum ThreadState<'tcx> {
121 Enabled,
123 Blocked { reason: BlockReason, timeout: Option<Timeout>, callback: DynUnblockCallback<'tcx> },
125 Terminated,
128}
129
130impl<'tcx> std::fmt::Debug for ThreadState<'tcx> {
131 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132 match self {
133 Self::Enabled => write!(f, "Enabled"),
134 Self::Blocked { reason, timeout, .. } =>
135 f.debug_struct("Blocked").field("reason", reason).field("timeout", timeout).finish(),
136 Self::Terminated => write!(f, "Terminated"),
137 }
138 }
139}
140
141impl<'tcx> ThreadState<'tcx> {
142 fn is_enabled(&self) -> bool {
143 matches!(self, ThreadState::Enabled)
144 }
145
146 fn is_terminated(&self) -> bool {
147 matches!(self, ThreadState::Terminated)
148 }
149
150 fn is_blocked_on(&self, reason: BlockReason) -> bool {
151 matches!(*self, ThreadState::Blocked { reason: actual_reason, .. } if actual_reason == reason)
152 }
153}
154
155#[derive(Debug, Copy, Clone, PartialEq, Eq)]
157enum ThreadJoinStatus {
158 Joinable,
160 Detached,
163 Joined,
165}
166
167pub struct Thread<'tcx> {
169 state: ThreadState<'tcx>,
170
171 thread_name: Option<Vec<u8>>,
173
174 stack: Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>>,
176
177 pub(crate) on_stack_empty: Option<StackEmptyCallback<'tcx>>,
182
183 top_user_relevant_frame: Option<usize>,
188
189 join_status: ThreadJoinStatus,
191
192 pub(crate) unwind_payloads: Vec<ImmTy<'tcx>>,
201
202 pub(crate) last_error: Option<MPlaceTy<'tcx>>,
204}
205
206pub type StackEmptyCallback<'tcx> =
207 Box<dyn FnMut(&mut MiriInterpCx<'tcx>) -> InterpResult<'tcx, Poll<()>> + 'tcx>;
208
209impl<'tcx> Thread<'tcx> {
210 fn thread_name(&self) -> Option<&[u8]> {
212 self.thread_name.as_deref()
213 }
214
215 pub fn is_enabled(&self) -> bool {
217 self.state.is_enabled()
218 }
219
220 fn thread_display_name(&self, id: ThreadId) -> String {
222 if let Some(ref thread_name) = self.thread_name {
223 String::from_utf8_lossy(thread_name).into_owned()
224 } else {
225 format!("unnamed-{}", id.index())
226 }
227 }
228
229 fn compute_top_user_relevant_frame(&self, skip: usize) -> Option<usize> {
235 let mut best = None;
237 for (idx, frame) in self.stack.iter().enumerate().rev().skip(skip) {
238 let relevance = frame.extra.user_relevance;
239 if relevance == u8::MAX {
240 return Some(idx);
242 }
243 if best.is_none_or(|(_best_idx, best_relevance)| best_relevance < relevance) {
244 best = Some((idx, relevance));
247 }
248 }
249 best.map(|(idx, _relevance)| idx)
250 }
251
252 pub fn recompute_top_user_relevant_frame(&mut self, skip: usize) {
255 self.top_user_relevant_frame = self.compute_top_user_relevant_frame(skip);
256 }
257
258 pub fn set_top_user_relevant_frame(&mut self, frame_idx: usize) {
261 debug_assert_eq!(Some(frame_idx), self.compute_top_user_relevant_frame(0));
262 self.top_user_relevant_frame = Some(frame_idx);
263 }
264
265 pub fn top_user_relevant_frame(&self) -> Option<usize> {
268 self.top_user_relevant_frame.or_else(|| self.stack.len().checked_sub(1))
272 }
273
274 pub fn current_user_relevance(&self) -> u8 {
275 self.top_user_relevant_frame()
276 .map(|frame_idx| self.stack[frame_idx].extra.user_relevance)
277 .unwrap_or(0)
278 }
279
280 pub fn current_user_relevant_span(&self) -> Span {
281 debug_assert_eq!(self.top_user_relevant_frame, self.compute_top_user_relevant_frame(0));
282 self.top_user_relevant_frame()
283 .map(|frame_idx| self.stack[frame_idx].current_span())
284 .unwrap_or(rustc_span::DUMMY_SP)
285 }
286}
287
288impl<'tcx> std::fmt::Debug for Thread<'tcx> {
289 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
290 write!(
291 f,
292 "{}({:?}, {:?})",
293 String::from_utf8_lossy(self.thread_name().unwrap_or(b"<unnamed>")),
294 self.state,
295 self.join_status
296 )
297 }
298}
299
300impl<'tcx> Thread<'tcx> {
301 fn new(name: Option<&str>, on_stack_empty: Option<StackEmptyCallback<'tcx>>) -> Self {
302 Self {
303 state: ThreadState::Enabled,
304 thread_name: name.map(|name| Vec::from(name.as_bytes())),
305 stack: Vec::new(),
306 top_user_relevant_frame: None,
307 join_status: ThreadJoinStatus::Joinable,
308 unwind_payloads: Vec::new(),
309 last_error: None,
310 on_stack_empty,
311 }
312 }
313}
314
315impl VisitProvenance for Thread<'_> {
316 fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
317 let Thread {
318 unwind_payloads: panic_payload,
319 last_error,
320 stack,
321 top_user_relevant_frame: _,
322 state: _,
323 thread_name: _,
324 join_status: _,
325 on_stack_empty: _, } = self;
327
328 for payload in panic_payload {
329 payload.visit_provenance(visit);
330 }
331 last_error.visit_provenance(visit);
332 for frame in stack {
333 frame.visit_provenance(visit)
334 }
335 }
336}
337
338impl VisitProvenance for Frame<'_, Provenance, FrameExtra<'_>> {
339 fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
340 let Frame {
341 return_place,
342 locals,
343 extra,
344 ..
346 } = self;
347
348 return_place.visit_provenance(visit);
350 for local in locals.iter() {
352 match local.as_mplace_or_imm() {
353 None => {}
354 Some(Either::Left((ptr, meta))) => {
355 ptr.visit_provenance(visit);
356 meta.visit_provenance(visit);
357 }
358 Some(Either::Right(imm)) => {
359 imm.visit_provenance(visit);
360 }
361 }
362 }
363
364 extra.visit_provenance(visit);
365 }
366}
367
368#[derive(Debug)]
370enum Timeout {
371 Monotonic(Instant),
372 RealTime(SystemTime),
373}
374
375impl Timeout {
376 fn get_wait_time(&self, clock: &MonotonicClock) -> Duration {
378 match self {
379 Timeout::Monotonic(instant) => instant.duration_since(clock.now()),
380 Timeout::RealTime(time) =>
381 time.duration_since(SystemTime::now()).unwrap_or(Duration::ZERO),
382 }
383 }
384
385 fn add_lossy(&self, duration: Duration) -> Self {
387 match self {
388 Timeout::Monotonic(i) => Timeout::Monotonic(i.add_lossy(duration)),
389 Timeout::RealTime(s) => {
390 Timeout::RealTime(
392 s.checked_add(duration)
393 .unwrap_or_else(|| s.checked_add(Duration::from_secs(3600)).unwrap()),
394 )
395 }
396 }
397 }
398}
399
400#[derive(Debug, Copy, Clone, PartialEq)]
402pub enum TimeoutClock {
403 Monotonic,
404 RealTime,
405}
406
407#[derive(Debug, Copy, Clone)]
409pub enum TimeoutAnchor {
410 Relative,
411 Absolute,
412}
413
414#[derive(Debug, Copy, Clone)]
416pub struct ThreadNotFound;
417
418#[derive(Debug)]
420pub struct ThreadManager<'tcx> {
421 active_thread: ThreadId,
423 threads: IndexVec<ThreadId, Thread<'tcx>>,
427 thread_local_allocs: FxHashMap<(DefId, ThreadId), StrictPointer>,
429 yield_active_thread: bool,
432 fixed_scheduling: bool,
434}
435
436impl VisitProvenance for ThreadManager<'_> {
437 fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
438 let ThreadManager {
439 threads,
440 thread_local_allocs,
441 active_thread: _,
442 yield_active_thread: _,
443 fixed_scheduling: _,
444 } = self;
445
446 for thread in threads {
447 thread.visit_provenance(visit);
448 }
449 for ptr in thread_local_allocs.values() {
450 ptr.visit_provenance(visit);
451 }
452 }
453}
454
455impl<'tcx> ThreadManager<'tcx> {
456 pub(crate) fn new(config: &MiriConfig) -> Self {
457 let mut threads = IndexVec::new();
458 threads.push(Thread::new(Some("main"), None));
460 Self {
461 active_thread: ThreadId::MAIN_THREAD,
462 threads,
463 thread_local_allocs: Default::default(),
464 yield_active_thread: false,
465 fixed_scheduling: config.fixed_scheduling,
466 }
467 }
468
469 pub(crate) fn init(
470 ecx: &mut MiriInterpCx<'tcx>,
471 on_main_stack_empty: StackEmptyCallback<'tcx>,
472 ) {
473 ecx.machine.threads.threads[ThreadId::MAIN_THREAD].on_stack_empty =
474 Some(on_main_stack_empty);
475 if ecx.tcx.sess.target.os != Os::Windows {
476 ecx.machine.threads.threads[ThreadId::MAIN_THREAD].join_status =
478 ThreadJoinStatus::Detached;
479 }
480 }
481
482 pub fn thread_id_try_from(&self, id: impl TryInto<u32>) -> Result<ThreadId, ThreadNotFound> {
483 if let Ok(id) = id.try_into()
484 && usize::try_from(id).is_ok_and(|id| id < self.threads.len())
485 {
486 Ok(ThreadId(id))
487 } else {
488 Err(ThreadNotFound)
489 }
490 }
491
492 fn get_thread_local_alloc_id(&self, def_id: DefId) -> Option<StrictPointer> {
495 self.thread_local_allocs.get(&(def_id, self.active_thread)).cloned()
496 }
497
498 fn set_thread_local_alloc(&mut self, def_id: DefId, ptr: StrictPointer) {
503 self.thread_local_allocs.try_insert((def_id, self.active_thread), ptr).unwrap();
504 }
505
506 pub fn active_thread_stack(&self) -> &[Frame<'tcx, Provenance, FrameExtra<'tcx>>] {
508 &self.threads[self.active_thread].stack
509 }
510
511 pub fn active_thread_stack_mut(
513 &mut self,
514 ) -> &mut Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>> {
515 &mut self.threads[self.active_thread].stack
516 }
517
518 pub fn all_blocked_stacks(
519 &self,
520 ) -> impl Iterator<Item = (ThreadId, &[Frame<'tcx, Provenance, FrameExtra<'tcx>>])> {
521 self.threads
522 .iter_enumerated()
523 .filter(|(_id, t)| matches!(t.state, ThreadState::Blocked { .. }))
524 .map(|(id, t)| (id, &t.stack[..]))
525 }
526
527 fn create_thread(&mut self, on_stack_empty: StackEmptyCallback<'tcx>) -> ThreadId {
529 let new_thread_id = ThreadId::new(self.threads.len());
530 self.threads.push(Thread::new(None, Some(on_stack_empty)));
531 new_thread_id
532 }
533
534 fn set_active_thread_id(&mut self, id: ThreadId) -> ThreadId {
536 assert!(id.index() < self.threads.len());
537 info!(
538 "---------- Now executing on thread `{}` (previous: `{}`) ----------------------------------------",
539 self.get_thread_display_name(id),
540 self.get_thread_display_name(self.active_thread)
541 );
542 std::mem::replace(&mut self.active_thread, id)
543 }
544
545 pub fn active_thread(&self) -> ThreadId {
547 self.active_thread
548 }
549
550 pub fn get_total_thread_count(&self) -> usize {
552 self.threads.len()
553 }
554
555 pub fn get_live_thread_count(&self) -> usize {
558 self.threads.iter().filter(|t| !t.state.is_terminated()).count()
559 }
560
561 fn has_terminated(&self, thread_id: ThreadId) -> bool {
563 self.threads[thread_id].state.is_terminated()
564 }
565
566 fn have_all_terminated(&self) -> bool {
568 self.threads.iter().all(|thread| thread.state.is_terminated())
569 }
570
571 fn enable_thread(&mut self, thread_id: ThreadId) {
573 assert!(self.has_terminated(thread_id));
574 self.threads[thread_id].state = ThreadState::Enabled;
575 }
576
577 pub fn active_thread_mut(&mut self) -> &mut Thread<'tcx> {
579 &mut self.threads[self.active_thread]
580 }
581
582 pub fn active_thread_ref(&self) -> &Thread<'tcx> {
584 &self.threads[self.active_thread]
585 }
586
587 fn detach_thread(&mut self, id: ThreadId, allow_terminated_joined: bool) -> InterpResult<'tcx> {
596 trace!("detaching {:?}", id);
598
599 let is_ub = if allow_terminated_joined && self.threads[id].state.is_terminated() {
600 self.threads[id].join_status == ThreadJoinStatus::Detached
602 } else {
603 self.threads[id].join_status != ThreadJoinStatus::Joinable
604 };
605 if is_ub {
606 throw_ub_format!("trying to detach thread that was already detached or joined");
607 }
608
609 self.threads[id].join_status = ThreadJoinStatus::Detached;
610 interp_ok(())
611 }
612
613 pub fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>) {
615 self.threads[thread].thread_name = Some(new_thread_name);
616 }
617
618 pub fn get_thread_name(&self, thread: ThreadId) -> Option<&[u8]> {
620 self.threads[thread].thread_name()
621 }
622
623 pub fn get_thread_display_name(&self, thread: ThreadId) -> String {
624 self.threads[thread].thread_display_name(thread)
625 }
626
627 fn block_thread(
629 &mut self,
630 reason: BlockReason,
631 timeout: Option<Timeout>,
632 callback: DynUnblockCallback<'tcx>,
633 ) {
634 let state = &mut self.threads[self.active_thread].state;
635 assert!(state.is_enabled());
636 *state = ThreadState::Blocked { reason, timeout, callback }
637 }
638
639 fn yield_active_thread(&mut self) {
641 self.yield_active_thread = true;
645 }
646
647 fn next_callback_wait_time(&self, clock: &MonotonicClock) -> Option<Duration> {
649 self.threads
650 .iter()
651 .filter_map(|t| {
652 match &t.state {
653 ThreadState::Blocked { timeout: Some(timeout), .. } =>
654 Some(timeout.get_wait_time(clock)),
655 _ => None,
656 }
657 })
658 .min()
659 }
660}
661
662impl<'tcx> EvalContextPrivExt<'tcx> for MiriInterpCx<'tcx> {}
663trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> {
664 #[inline]
666 fn run_timeout_callback(&mut self) -> InterpResult<'tcx> {
667 let this = self.eval_context_mut();
668 let mut found_callback = None;
669 for (id, thread) in this.machine.threads.threads.iter_enumerated_mut() {
671 match &thread.state {
672 ThreadState::Blocked { timeout: Some(timeout), .. }
673 if timeout.get_wait_time(&this.machine.monotonic_clock) == Duration::ZERO =>
674 {
675 let old_state = mem::replace(&mut thread.state, ThreadState::Enabled);
676 let ThreadState::Blocked { callback, .. } = old_state else { unreachable!() };
677 found_callback = Some((id, callback));
678 break;
680 }
681 _ => {}
682 }
683 }
684 if let Some((thread, callback)) = found_callback {
685 let old_thread = this.machine.threads.set_active_thread_id(thread);
692 callback.call(this, UnblockKind::TimedOut)?;
693 this.machine.threads.set_active_thread_id(old_thread);
694 }
695 interp_ok(())
702 }
703
704 #[inline]
705 fn run_on_stack_empty(&mut self) -> InterpResult<'tcx, Poll<()>> {
706 let this = self.eval_context_mut();
707 let mut callback = this
708 .active_thread_mut()
709 .on_stack_empty
710 .take()
711 .expect("`on_stack_empty` not set up, or already running");
712 let res = callback(this)?;
713 this.active_thread_mut().on_stack_empty = Some(callback);
714 interp_ok(res)
715 }
716
717 fn schedule(&mut self) -> InterpResult<'tcx, SchedulingAction> {
726 let this = self.eval_context_mut();
727
728 if this.machine.data_race.as_genmc_ref().is_some() {
730 loop {
731 let genmc_ctx = this.machine.data_race.as_genmc_ref().unwrap();
732 let Some(next_thread_id) = genmc_ctx.schedule_thread(this)? else {
733 return interp_ok(SchedulingAction::ExecuteStep);
734 };
735 if this.machine.threads.threads[next_thread_id]
737 .state
738 .is_blocked_on(BlockReason::Genmc)
739 {
740 info!(
741 "GenMC: scheduling blocked thread {next_thread_id:?}, so we unblock it now."
742 );
743 this.unblock_thread(next_thread_id, BlockReason::Genmc)?;
744 }
745 let thread_manager = &mut this.machine.threads;
748 if thread_manager.threads[next_thread_id].state.is_enabled() {
749 thread_manager.active_thread = next_thread_id;
751 return interp_ok(SchedulingAction::ExecuteStep);
752 }
753 }
754 }
755
756 let thread_manager = &mut this.machine.threads;
758 let clock = &this.machine.monotonic_clock;
759 let rng = this.machine.rng.get_mut();
760 if thread_manager.threads[thread_manager.active_thread].state.is_enabled()
762 && !thread_manager.yield_active_thread
763 {
764 return interp_ok(SchedulingAction::ExecuteStep);
766 }
767 let potential_sleep_time = thread_manager.next_callback_wait_time(clock);
774 if potential_sleep_time == Some(Duration::ZERO) {
775 return interp_ok(SchedulingAction::ExecuteTimeoutCallback);
776 }
777 let mut threads_iter = thread_manager
784 .threads
785 .iter_enumerated()
786 .skip(thread_manager.active_thread.index() + 1)
787 .chain(
788 thread_manager
789 .threads
790 .iter_enumerated()
791 .take(thread_manager.active_thread.index() + 1),
792 )
793 .filter(|(_id, thread)| thread.state.is_enabled());
794 let new_thread = if thread_manager.fixed_scheduling {
796 threads_iter.next()
797 } else {
798 threads_iter.choose(rng)
799 };
800
801 if let Some((id, _thread)) = new_thread {
802 if thread_manager.active_thread != id {
803 info!(
804 "---------- Now executing on thread `{}` (previous: `{}`) ----------------------------------------",
805 thread_manager.get_thread_display_name(id),
806 thread_manager.get_thread_display_name(thread_manager.active_thread)
807 );
808 thread_manager.active_thread = id;
809 }
810 }
811 thread_manager.yield_active_thread = false;
813
814 if thread_manager.threads[thread_manager.active_thread].state.is_enabled() {
815 return interp_ok(SchedulingAction::ExecuteStep);
816 }
817 if thread_manager.threads.iter().all(|thread| thread.state.is_terminated()) {
819 unreachable!("all threads terminated without the main thread terminating?!");
820 } else if let Some(sleep_time) = potential_sleep_time {
821 interp_ok(SchedulingAction::Sleep(sleep_time))
825 } else {
826 throw_machine_stop!(TerminationInfo::Deadlock);
827 }
828 }
829}
830
831impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {}
833pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
834 #[inline]
835 fn thread_id_try_from(&self, id: impl TryInto<u32>) -> Result<ThreadId, ThreadNotFound> {
836 self.eval_context_ref().machine.threads.thread_id_try_from(id)
837 }
838
839 fn get_or_create_thread_local_alloc(
842 &mut self,
843 def_id: DefId,
844 ) -> InterpResult<'tcx, StrictPointer> {
845 let this = self.eval_context_mut();
846 let tcx = this.tcx;
847 if let Some(old_alloc) = this.machine.threads.get_thread_local_alloc_id(def_id) {
848 interp_ok(old_alloc)
851 } else {
852 if tcx.is_foreign_item(def_id) {
856 throw_unsup_format!("foreign thread-local statics are not supported");
857 }
858 let params = this.machine.get_default_alloc_params();
859 let alloc = this.ctfe_query(|tcx| tcx.eval_static_initializer(def_id))?;
860 let mut alloc = alloc.inner().adjust_from_tcx(
862 &this.tcx,
863 |bytes, align| {
864 interp_ok(MiriAllocBytes::from_bytes(
865 std::borrow::Cow::Borrowed(bytes),
866 align,
867 params,
868 ))
869 },
870 |ptr| this.global_root_pointer(ptr),
871 )?;
872 alloc.mutability = Mutability::Mut;
874 let ptr = this.insert_allocation(alloc, MiriMemoryKind::Tls.into())?;
876 this.machine.threads.set_thread_local_alloc(def_id, ptr);
877 interp_ok(ptr)
878 }
879 }
880
881 #[inline]
883 fn start_regular_thread(
884 &mut self,
885 thread: Option<MPlaceTy<'tcx>>,
886 start_routine: Pointer,
887 start_abi: ExternAbi,
888 func_arg: ImmTy<'tcx>,
889 ret_layout: TyAndLayout<'tcx>,
890 ) -> InterpResult<'tcx, ThreadId> {
891 let this = self.eval_context_mut();
892
893 let new_thread_id = this.machine.threads.create_thread({
895 let mut state = tls::TlsDtorsState::default();
896 Box::new(move |m| state.on_stack_empty(m))
897 });
898 let current_span = this.machine.current_user_relevant_span();
899 match &mut this.machine.data_race {
900 GlobalDataRaceHandler::None => {}
901 GlobalDataRaceHandler::Vclocks(data_race) =>
902 data_race.thread_created(&this.machine.threads, new_thread_id, current_span),
903 GlobalDataRaceHandler::Genmc(genmc_ctx) =>
904 genmc_ctx.handle_thread_create(
905 &this.machine.threads,
906 start_routine,
907 &func_arg,
908 new_thread_id,
909 )?,
910 }
911 if let Some(thread_info_place) = thread {
914 this.write_scalar(
915 Scalar::from_uint(new_thread_id.to_u32(), thread_info_place.layout.size),
916 &thread_info_place,
917 )?;
918 }
919
920 let old_thread_id = this.machine.threads.set_active_thread_id(new_thread_id);
923
924 if let Some(cpuset) = this.machine.thread_cpu_affinity.get(&old_thread_id).cloned() {
926 this.machine.thread_cpu_affinity.insert(new_thread_id, cpuset);
927 }
928
929 let instance = this.get_ptr_fn(start_routine)?.as_instance()?;
931
932 let ret_place = this.allocate(ret_layout, MiriMemoryKind::Machine.into())?;
936
937 this.call_function(
938 instance,
939 start_abi,
940 &[func_arg],
941 Some(&ret_place),
942 ReturnContinuation::Stop { cleanup: true },
943 )?;
944
945 this.machine.threads.set_active_thread_id(old_thread_id);
947
948 interp_ok(new_thread_id)
949 }
950
951 fn terminate_active_thread(&mut self, tls_alloc_action: TlsAllocAction) -> InterpResult<'tcx> {
956 let this = self.eval_context_mut();
957
958 let thread = this.active_thread_mut();
960 assert!(thread.stack.is_empty(), "only threads with an empty stack can be terminated");
961 thread.state = ThreadState::Terminated;
962
963 let gone_thread = this.active_thread();
965 {
966 let mut free_tls_statics = Vec::new();
967 this.machine.threads.thread_local_allocs.retain(|&(_def_id, thread), &mut alloc_id| {
968 if thread != gone_thread {
969 return true;
971 }
972 free_tls_statics.push(alloc_id);
975 false
976 });
977 for ptr in free_tls_statics {
979 match tls_alloc_action {
980 TlsAllocAction::Deallocate =>
981 this.deallocate_ptr(ptr.into(), None, MiriMemoryKind::Tls.into())?,
982 TlsAllocAction::Leak =>
983 if let Some(alloc) = ptr.provenance.get_alloc_id() {
984 trace!(
985 "Thread-local static leaked and stored as static root: {:?}",
986 alloc
987 );
988 this.machine.static_roots.push(alloc);
989 },
990 }
991 }
992 }
993
994 match &mut this.machine.data_race {
995 GlobalDataRaceHandler::None => {}
996 GlobalDataRaceHandler::Vclocks(data_race) =>
997 data_race.thread_terminated(&this.machine.threads),
998 GlobalDataRaceHandler::Genmc(genmc_ctx) => {
999 genmc_ctx.handle_thread_finish(&this.machine.threads)
1002 }
1003 }
1004
1005 let unblock_reason = BlockReason::Join(gone_thread);
1007 let threads = &this.machine.threads.threads;
1008 let joining_threads = threads
1009 .iter_enumerated()
1010 .filter(|(_, thread)| thread.state.is_blocked_on(unblock_reason))
1011 .map(|(id, _)| id)
1012 .collect::<Vec<_>>();
1013 for thread in joining_threads {
1014 this.unblock_thread(thread, unblock_reason)?;
1015 }
1016
1017 interp_ok(())
1018 }
1019
1020 #[inline]
1023 fn block_thread(
1024 &mut self,
1025 reason: BlockReason,
1026 timeout: Option<(TimeoutClock, TimeoutAnchor, Duration)>,
1027 callback: DynUnblockCallback<'tcx>,
1028 ) {
1029 let this = self.eval_context_mut();
1030 if timeout.is_some() && this.machine.data_race.as_genmc_ref().is_some() {
1031 panic!("Unimplemented: Timeouts not yet supported in GenMC mode.");
1032 }
1033 let timeout = timeout.map(|(clock, anchor, duration)| {
1034 let anchor = match clock {
1035 TimeoutClock::RealTime => {
1036 assert!(
1037 this.machine.communicate(),
1038 "cannot have `RealTime` timeout with isolation enabled!"
1039 );
1040 Timeout::RealTime(match anchor {
1041 TimeoutAnchor::Absolute => SystemTime::UNIX_EPOCH,
1042 TimeoutAnchor::Relative => SystemTime::now(),
1043 })
1044 }
1045 TimeoutClock::Monotonic =>
1046 Timeout::Monotonic(match anchor {
1047 TimeoutAnchor::Absolute => this.machine.monotonic_clock.epoch(),
1048 TimeoutAnchor::Relative => this.machine.monotonic_clock.now(),
1049 }),
1050 };
1051 anchor.add_lossy(duration)
1052 });
1053 this.machine.threads.block_thread(reason, timeout, callback);
1054 }
1055
1056 fn unblock_thread(&mut self, thread: ThreadId, reason: BlockReason) -> InterpResult<'tcx> {
1059 let this = self.eval_context_mut();
1060 let old_state =
1061 mem::replace(&mut this.machine.threads.threads[thread].state, ThreadState::Enabled);
1062 let callback = match old_state {
1063 ThreadState::Blocked { reason: actual_reason, callback, .. } => {
1064 assert_eq!(
1065 reason, actual_reason,
1066 "unblock_thread: thread was blocked for the wrong reason"
1067 );
1068 callback
1069 }
1070 _ => panic!("unblock_thread: thread was not blocked"),
1071 };
1072 let old_thread = this.machine.threads.set_active_thread_id(thread);
1074 callback.call(this, UnblockKind::Ready)?;
1075 this.machine.threads.set_active_thread_id(old_thread);
1076 interp_ok(())
1077 }
1078
1079 #[inline]
1080 fn detach_thread(
1081 &mut self,
1082 thread_id: ThreadId,
1083 allow_terminated_joined: bool,
1084 ) -> InterpResult<'tcx> {
1085 let this = self.eval_context_mut();
1086 this.machine.threads.detach_thread(thread_id, allow_terminated_joined)
1087 }
1088
1089 fn join_thread(
1093 &mut self,
1094 joined_thread_id: ThreadId,
1095 success_retval: Scalar,
1096 return_dest: &MPlaceTy<'tcx>,
1097 ) -> InterpResult<'tcx> {
1098 let this = self.eval_context_mut();
1099 let thread_mgr = &mut this.machine.threads;
1100 if thread_mgr.threads[joined_thread_id].join_status == ThreadJoinStatus::Detached {
1101 throw_ub_format!("trying to join a detached thread");
1103 }
1104
1105 fn after_join<'tcx>(
1106 this: &mut InterpCx<'tcx, MiriMachine<'tcx>>,
1107 joined_thread_id: ThreadId,
1108 success_retval: Scalar,
1109 return_dest: &MPlaceTy<'tcx>,
1110 ) -> InterpResult<'tcx> {
1111 let threads = &this.machine.threads;
1112 match &mut this.machine.data_race {
1113 GlobalDataRaceHandler::None => {}
1114 GlobalDataRaceHandler::Vclocks(data_race) =>
1115 data_race.thread_joined(threads, joined_thread_id),
1116 GlobalDataRaceHandler::Genmc(genmc_ctx) =>
1117 genmc_ctx.handle_thread_join(threads.active_thread, joined_thread_id)?,
1118 }
1119 this.write_scalar(success_retval, return_dest)?;
1120 interp_ok(())
1121 }
1122
1123 thread_mgr.threads[joined_thread_id].join_status = ThreadJoinStatus::Joined;
1126 if !thread_mgr.threads[joined_thread_id].state.is_terminated() {
1127 trace!(
1128 "{:?} blocked on {:?} when trying to join",
1129 thread_mgr.active_thread, joined_thread_id
1130 );
1131 if let Some(genmc_ctx) = this.machine.data_race.as_genmc_ref() {
1132 genmc_ctx.handle_thread_join(thread_mgr.active_thread, joined_thread_id)?;
1133 }
1134
1135 let dest = return_dest.clone();
1138 thread_mgr.block_thread(
1139 BlockReason::Join(joined_thread_id),
1140 None,
1141 callback!(
1142 @capture<'tcx> {
1143 joined_thread_id: ThreadId,
1144 dest: MPlaceTy<'tcx>,
1145 success_retval: Scalar,
1146 }
1147 |this, unblock: UnblockKind| {
1148 assert_eq!(unblock, UnblockKind::Ready);
1149 after_join(this, joined_thread_id, success_retval, &dest)
1150 }
1151 ),
1152 );
1153 } else {
1154 after_join(this, joined_thread_id, success_retval, return_dest)?;
1156 }
1157 interp_ok(())
1158 }
1159
1160 fn join_thread_exclusive(
1165 &mut self,
1166 joined_thread_id: ThreadId,
1167 success_retval: Scalar,
1168 return_dest: &MPlaceTy<'tcx>,
1169 ) -> InterpResult<'tcx> {
1170 let this = self.eval_context_mut();
1171 let threads = &this.machine.threads.threads;
1172 if threads[joined_thread_id].join_status == ThreadJoinStatus::Joined {
1173 throw_ub_format!("trying to join an already joined thread");
1174 }
1175
1176 if joined_thread_id == this.machine.threads.active_thread {
1177 throw_ub_format!("trying to join itself");
1178 }
1179
1180 assert!(
1182 threads
1183 .iter()
1184 .all(|thread| { !thread.state.is_blocked_on(BlockReason::Join(joined_thread_id)) }),
1185 "this thread already has threads waiting for its termination"
1186 );
1187
1188 this.join_thread(joined_thread_id, success_retval, return_dest)
1189 }
1190
1191 #[inline]
1192 fn active_thread(&self) -> ThreadId {
1193 let this = self.eval_context_ref();
1194 this.machine.threads.active_thread()
1195 }
1196
1197 #[inline]
1198 fn active_thread_mut(&mut self) -> &mut Thread<'tcx> {
1199 let this = self.eval_context_mut();
1200 this.machine.threads.active_thread_mut()
1201 }
1202
1203 #[inline]
1204 fn active_thread_ref(&self) -> &Thread<'tcx> {
1205 let this = self.eval_context_ref();
1206 this.machine.threads.active_thread_ref()
1207 }
1208
1209 #[inline]
1210 fn get_total_thread_count(&self) -> usize {
1211 let this = self.eval_context_ref();
1212 this.machine.threads.get_total_thread_count()
1213 }
1214
1215 #[inline]
1216 fn have_all_terminated(&self) -> bool {
1217 let this = self.eval_context_ref();
1218 this.machine.threads.have_all_terminated()
1219 }
1220
1221 #[inline]
1222 fn enable_thread(&mut self, thread_id: ThreadId) {
1223 let this = self.eval_context_mut();
1224 this.machine.threads.enable_thread(thread_id);
1225 }
1226
1227 #[inline]
1228 fn active_thread_stack<'a>(&'a self) -> &'a [Frame<'tcx, Provenance, FrameExtra<'tcx>>] {
1229 let this = self.eval_context_ref();
1230 this.machine.threads.active_thread_stack()
1231 }
1232
1233 #[inline]
1234 fn active_thread_stack_mut<'a>(
1235 &'a mut self,
1236 ) -> &'a mut Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>> {
1237 let this = self.eval_context_mut();
1238 this.machine.threads.active_thread_stack_mut()
1239 }
1240
1241 #[inline]
1243 fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>) {
1244 self.eval_context_mut().machine.threads.set_thread_name(thread, new_thread_name);
1245 }
1246
1247 #[inline]
1248 fn get_thread_name<'c>(&'c self, thread: ThreadId) -> Option<&'c [u8]>
1249 where
1250 'tcx: 'c,
1251 {
1252 self.eval_context_ref().machine.threads.get_thread_name(thread)
1253 }
1254
1255 #[inline]
1256 fn yield_active_thread(&mut self) {
1257 self.eval_context_mut().machine.threads.yield_active_thread();
1258 }
1259
1260 #[inline]
1261 fn maybe_preempt_active_thread(&mut self) {
1262 use rand::Rng as _;
1263
1264 let this = self.eval_context_mut();
1265 if !this.machine.threads.fixed_scheduling
1266 && this.machine.rng.get_mut().random_bool(this.machine.preemption_rate)
1267 {
1268 this.yield_active_thread();
1269 }
1270 }
1271
1272 fn run_threads(&mut self) -> InterpResult<'tcx, !> {
1275 let this = self.eval_context_mut();
1276 loop {
1277 if CTRL_C_RECEIVED.load(Relaxed) {
1278 this.machine.handle_abnormal_termination();
1279 throw_machine_stop!(TerminationInfo::Interrupted);
1280 }
1281 match this.schedule()? {
1282 SchedulingAction::ExecuteStep => {
1283 if !this.step()? {
1284 match this.run_on_stack_empty()? {
1286 Poll::Pending => {} Poll::Ready(()) =>
1288 this.terminate_active_thread(TlsAllocAction::Deallocate)?,
1289 }
1290 }
1291 }
1292 SchedulingAction::ExecuteTimeoutCallback => {
1293 this.run_timeout_callback()?;
1294 }
1295 SchedulingAction::Sleep(duration) => {
1296 this.machine.monotonic_clock.sleep(duration);
1297 }
1298 }
1299 }
1300 }
1301}