compiletest/executor/
deadline.rs1use std::collections::VecDeque;
2use std::sync::mpsc::{self, RecvError, RecvTimeoutError};
3use std::time::{Duration, Instant};
4
5use crate::executor::{CollectedTest, TestId};
6
7const TEST_WARN_TIMEOUT_S: u64 = 60;
8
9struct DeadlineEntry<'a> {
10 id: TestId,
11 test: &'a CollectedTest,
12 deadline: Instant,
13}
14
15pub(crate) struct DeadlineQueue<'a> {
16 queue: VecDeque<DeadlineEntry<'a>>,
17}
18
19impl<'a> DeadlineQueue<'a> {
20 pub(crate) fn with_capacity(capacity: usize) -> Self {
21 Self { queue: VecDeque::with_capacity(capacity) }
22 }
23
24 fn now(&self) -> Instant {
27 Instant::now()
28 }
29
30 pub(crate) fn push(&mut self, id: TestId, test: &'a CollectedTest) {
31 let deadline = self.now() + Duration::from_secs(TEST_WARN_TIMEOUT_S);
32 if let Some(back) = self.queue.back() {
33 assert!(back.deadline <= deadline);
34 }
35 self.queue.push_back(DeadlineEntry { id, test, deadline });
36 }
37
38 pub(crate) fn read_channel_while_checking_deadlines<T>(
41 &mut self,
42 rx: &mpsc::Receiver<T>,
43 is_running: impl Fn(TestId) -> bool,
44 mut on_deadline_passed: impl FnMut(TestId, &CollectedTest),
45 ) -> Result<T, RecvError> {
46 loop {
47 let Some(next_deadline) = self.next_deadline() else {
48 return rx.recv();
51 };
52 let next_deadline_timeout = next_deadline.saturating_duration_since(self.now());
53
54 let recv_result = rx.recv_timeout(next_deadline_timeout);
55 self.for_each_entry_past_deadline(&is_running, &mut on_deadline_passed);
59
60 match recv_result {
61 Ok(value) => return Ok(value),
62 Err(RecvTimeoutError::Timeout) => {}
64 Err(RecvTimeoutError::Disconnected) => return Err(RecvError),
65 }
66 }
67 }
68
69 fn next_deadline(&self) -> Option<Instant> {
70 Some(self.queue.front()?.deadline)
71 }
72
73 fn for_each_entry_past_deadline(
74 &mut self,
75 is_running: impl Fn(TestId) -> bool,
76 mut on_deadline_passed: impl FnMut(TestId, &CollectedTest),
77 ) {
78 let now = self.now();
79
80 while let Some(entry) = pop_front_if(&mut self.queue, |entry| entry.deadline <= now) {
83 if is_running(entry.id) {
84 on_deadline_passed(entry.id, entry.test);
85 }
86 }
87
88 while let Some(_) = pop_front_if(&mut self.queue, |entry| !is_running(entry.id)) {}
91
92 if let Some(front) = self.queue.front() {
93 assert!(now < front.deadline);
94 }
95 }
96}
97
98fn pop_front_if<T>(queue: &mut VecDeque<T>, predicate: impl FnOnce(&T) -> bool) -> Option<T> {
100 let first = queue.front()?;
101 if predicate(first) { queue.pop_front() } else { None }
102}