compiletest/executor/
deadline.rs

1use std::collections::VecDeque;
2use std::sync::mpsc::{self, RecvError, RecvTimeoutError};
3use std::time::{Duration, Instant};
4
5use crate::executor::{CollectedTest, TestId};
6
7const TEST_WARN_TIMEOUT_S: u64 = 60;
8
9struct DeadlineEntry<'a> {
10    id: TestId,
11    test: &'a CollectedTest,
12    deadline: Instant,
13}
14
15pub(crate) struct DeadlineQueue<'a> {
16    queue: VecDeque<DeadlineEntry<'a>>,
17}
18
19impl<'a> DeadlineQueue<'a> {
20    pub(crate) fn with_capacity(capacity: usize) -> Self {
21        Self { queue: VecDeque::with_capacity(capacity) }
22    }
23
24    /// All calls to [`Instant::now`] go through this wrapper method.
25    /// This makes it easier to find all places that read the current time.
26    fn now(&self) -> Instant {
27        Instant::now()
28    }
29
30    pub(crate) fn push(&mut self, id: TestId, test: &'a CollectedTest) {
31        let deadline = self.now() + Duration::from_secs(TEST_WARN_TIMEOUT_S);
32        if let Some(back) = self.queue.back() {
33            assert!(back.deadline <= deadline);
34        }
35        self.queue.push_back(DeadlineEntry { id, test, deadline });
36    }
37
38    /// Equivalent to `rx.recv()`, except that if a test exceeds its deadline
39    /// during the wait, the given callback will also be called for that test.
40    pub(crate) fn read_channel_while_checking_deadlines<T>(
41        &mut self,
42        rx: &mpsc::Receiver<T>,
43        is_running: impl Fn(TestId) -> bool,
44        mut on_deadline_passed: impl FnMut(TestId, &CollectedTest),
45    ) -> Result<T, RecvError> {
46        loop {
47            let Some(next_deadline) = self.next_deadline() else {
48                // All currently-running tests have already exceeded their
49                // deadline, so do a normal receive.
50                return rx.recv();
51            };
52            let next_deadline_timeout = next_deadline.saturating_duration_since(self.now());
53
54            let recv_result = rx.recv_timeout(next_deadline_timeout);
55            // Process deadlines after every receive attempt, regardless of
56            // outcome, so that we don't build up an unbounded backlog of stale
57            // entries due to a constant stream of tests finishing.
58            self.for_each_entry_past_deadline(&is_running, &mut on_deadline_passed);
59
60            match recv_result {
61                Ok(value) => return Ok(value),
62                // Deadlines have already been processed, so loop and do another receive.
63                Err(RecvTimeoutError::Timeout) => {}
64                Err(RecvTimeoutError::Disconnected) => return Err(RecvError),
65            }
66        }
67    }
68
69    fn next_deadline(&self) -> Option<Instant> {
70        Some(self.queue.front()?.deadline)
71    }
72
73    fn for_each_entry_past_deadline(
74        &mut self,
75        is_running: impl Fn(TestId) -> bool,
76        mut on_deadline_passed: impl FnMut(TestId, &CollectedTest),
77    ) {
78        let now = self.now();
79
80        // Clear out entries that are past their deadline, but only invoke the
81        // callback for tests that are still considered running.
82        while let Some(entry) = pop_front_if(&mut self.queue, |entry| entry.deadline <= now) {
83            if is_running(entry.id) {
84                on_deadline_passed(entry.id, entry.test);
85            }
86        }
87
88        // Also clear out any leading entries that are no longer running, even
89        // if their deadline hasn't been reached.
90        while let Some(_) = pop_front_if(&mut self.queue, |entry| !is_running(entry.id)) {}
91
92        if let Some(front) = self.queue.front() {
93            assert!(now < front.deadline);
94        }
95    }
96}
97
98/// FIXME(vec_deque_pop_if): Use `VecDeque::pop_front_if` when it is stable in bootstrap.
99fn pop_front_if<T>(queue: &mut VecDeque<T>, predicate: impl FnOnce(&T) -> bool) -> Option<T> {
100    let first = queue.front()?;
101    if predicate(first) { queue.pop_front() } else { None }
102}