Skip to main content

miri/concurrency/
blocking_io.rs

1use std::cell::RefMut;
2use std::collections::BTreeMap;
3use std::io;
4use std::ops::BitOrAssign;
5use std::time::Duration;
6
7use mio::event::Source;
8use mio::{Events, Interest, Poll, Token};
9
10use crate::shims::{
11    EpollEvalContextExt, FdId, FileDescription, FileDescriptionRef, WeakFileDescriptionRef,
12};
13use crate::*;
14
15/// Capacity of the event queue which can be polled at a time.
16/// Since we don't expect many simultaneous blocking I/O events
17/// this value can be set rather low.
18const IO_EVENT_CAPACITY: usize = 16;
19
20/// Trait for file descriptions that contain a mio [`Source`].
21pub trait SourceFileDescription: FileDescription {
22    /// Invoke `f` on the source inside `self`.
23    fn with_source(&self, f: &mut dyn FnMut(&mut dyn Source) -> io::Result<()>) -> io::Result<()>;
24
25    /// Get a mutable reference to the readiness of the source.
26    fn get_readiness_mut(&self) -> RefMut<'_, BlockingIoSourceReadiness>;
27}
28
29/// An I/O interest for a blocked thread. Note that all threads are always considered
30/// to be interested in "error" events.
31#[derive(Debug, Clone, Copy)]
32pub enum BlockingIoInterest {
33    /// The blocked thread is interested in [`Interest::READABLE`].
34    Read,
35    /// The blocked thread is interested in [`Interest::WRITABLE`].
36    Write,
37    /// The blocked thread is interested in [`Interest::READABLE`] and
38    /// [`Interest::WRITABLE`].
39    ReadWrite,
40}
41
42/// Struct reflecting the readiness of a source file description.
43#[derive(Debug)]
44pub struct BlockingIoSourceReadiness {
45    /// Boolean whether the source is currently readable.
46    pub readable: bool,
47    /// Boolean whether the source is currently writable.
48    pub writable: bool,
49    /// Boolean whether the read end of the source has been
50    /// closed.
51    pub read_closed: bool,
52    /// Boolean whether the write end of the source has been
53    /// closed.
54    pub write_closed: bool,
55    /// Boolean whether the source currently has an error.
56    pub error: bool,
57}
58
59impl BlockingIoSourceReadiness {
60    pub fn empty() -> Self {
61        Self {
62            readable: false,
63            writable: false,
64            read_closed: false,
65            write_closed: false,
66            error: false,
67        }
68    }
69
70    /// Check whether the current readiness fulfills the blocking I/O interest of
71    /// `interest`.
72    /// This function also returns `true` if the error readiness is set
73    /// even when the requested interest might not be fulfilled.
74    fn fulfills_interest(&self, interest: &BlockingIoInterest) -> bool {
75        match interest {
76            BlockingIoInterest::Read => self.readable || self.error,
77            BlockingIoInterest::Write => self.writable || self.error,
78            BlockingIoInterest::ReadWrite => self.readable || self.writable || self.error,
79        }
80    }
81}
82
83impl BitOrAssign for BlockingIoSourceReadiness {
84    fn bitor_assign(&mut self, rhs: Self) {
85        self.readable |= rhs.readable;
86        self.writable |= rhs.writable;
87        self.read_closed |= rhs.read_closed;
88        self.write_closed |= rhs.write_closed;
89        self.error |= rhs.error;
90    }
91}
92
93impl From<&mio::event::Event> for BlockingIoSourceReadiness {
94    fn from(event: &mio::event::Event) -> Self {
95        Self {
96            readable: event.is_readable(),
97            writable: event.is_writable(),
98            read_closed: event.is_read_closed(),
99            write_closed: event.is_write_closed(),
100            error: event.is_error(),
101        }
102    }
103}
104
105struct BlockingIoSource {
106    /// The source file description which is registered into the poll.
107    /// We only store weak references such that source file descriptions
108    /// can be destroyed whilst they are registered. However, they are required
109    /// to deregister themselves when [`FileDescription::destroy`] is called.
110    fd: WeakFileDescriptionRef<dyn SourceFileDescription>,
111    /// The threads which are blocked on the I/O source, and the interest indicating
112    /// when they should be unblocked.
113    blocked_threads: BTreeMap<ThreadId, BlockingIoInterest>,
114}
115
116/// Manager for managing blocking host I/O in a non-blocking manner.
117/// We use [`Poll`] to poll for new I/O events from the OS for sources
118/// registered using this manager.
119///
120/// The semantics of this manager are that host I/O sources are registered
121/// to a [`Poll`] for their entire lifespan. Once host readiness events happen
122/// on a registered source, its internal epoll readiness gets updated -- even
123/// when the source isn't part of an active epoll instance. Also, for the entire
124/// lifespan of the source, threads can be added which should be unblocked
125/// once a certain [`BlockingIoSourceReadiness`] for an I/O source is satisfied.
126///
127/// Since blocking host I/O is inherently non-deterministic, no method on this
128/// manager should be called when isolation is enabled. The only exception is
129/// the [`BlockingIoManager::new`] function to create the manager. Everywhere else,
130/// we assert that isolation is disabled!
131pub struct BlockingIoManager {
132    /// Poll instance to monitor I/O events from the OS.
133    /// This is only [`None`] when Miri is run with isolation enabled.
134    poll: Option<Poll>,
135    /// Buffer used to store the ready I/O events when calling [`Poll::poll`].
136    /// This is not part of the state and only stored to avoid allocating a
137    /// new buffer for every poll.
138    events: Events,
139    /// Map from source file description ids to the actual sources and their
140    /// blocked threads.
141    sources: BTreeMap<FdId, BlockingIoSource>,
142}
143
144impl BlockingIoManager {
145    /// Create a new blocking I/O manager instance based on the availability
146    /// of communication with the host.
147    pub fn new(communicate: bool) -> Result<Self, io::Error> {
148        let manager = Self {
149            poll: communicate.then_some(Poll::new()?),
150            events: Events::with_capacity(IO_EVENT_CAPACITY),
151            sources: BTreeMap::default(),
152        };
153        Ok(manager)
154    }
155
156    /// Poll for new I/O events from the OS or wait until the timeout expired.
157    /// The timeout semantics are the same as described in [`Poll::poll`].
158    /// The events also immediately get processed: threads get unblocked, and epoll readiness gets updated.
159    fn poll<'tcx>(
160        ecx: &mut MiriInterpCx<'tcx>,
161        timeout: Option<Duration>,
162    ) -> InterpResult<'tcx, Result<(), io::Error>> {
163        let poll = ecx
164            .machine
165            .blocking_io
166            .poll
167            .as_mut()
168            .expect("Blocking I/O should not be called with isolation enabled");
169
170        // Poll for new I/O events from OS and store them in the events buffer.
171        if let Err(err) = poll.poll(&mut ecx.machine.blocking_io.events, timeout) {
172            return interp_ok(Err(err));
173        };
174
175        let event_fds = ecx
176            .machine
177            .blocking_io
178            .events
179            .iter()
180            .map(|event| {
181                let token = event.token();
182                // We know all tokens are valid `FdId`.
183                let fd_id = FdId::new_unchecked(token.0);
184                let source = ecx
185                    .machine
186                    .blocking_io
187                    .sources
188                    .get(&fd_id)
189                    .expect("Source should be registered");
190                let fd = source.fd.upgrade().expect(
191                    "Source file description shouldn't be destroyed whilst being registered",
192                );
193
194                assert_eq!(fd.id(), fd_id);
195                // Update the readiness of the source.
196                *fd.get_readiness_mut() |= BlockingIoSourceReadiness::from(event);
197                // Put FD into `event_fds` list.
198                fd
199            })
200            .collect::<Vec<_>>();
201
202        // Update the epoll readiness for all source file descriptions which received an event. Also,
203        // unblock the threads which are blocked on such a source and whose interests are now fulfilled.
204        for fd in event_fds.into_iter() {
205            // Update epoll readiness for the `fd` source.
206            ecx.update_epoll_active_events(fd.clone(), false)?;
207
208            let source =
209                ecx.machine.blocking_io.sources.get(&fd.id()).expect(
210                    "Source file description shouldn't be destroyed whilst being registered",
211                );
212
213            // List of all thread id's whose interests are currently fulfilled
214            // and which are blocked on the `fd` source. This also includes
215            // threads whose interests were already fulfilled before the
216            // `poll` invocation.
217            let threads = source
218                .blocked_threads
219                .iter()
220                .filter_map(|(thread_id, interest)| {
221                    fd.get_readiness_mut().fulfills_interest(interest).then_some(*thread_id)
222                })
223                .collect::<Vec<_>>();
224
225            // Unblock all threads whose interests are currently fulfilled and
226            // which are blocked on the `fd` source.
227            threads
228                .into_iter()
229                .try_for_each(|thread_id| ecx.unblock_thread(thread_id, BlockReason::IO))?;
230        }
231
232        interp_ok(Ok(()))
233    }
234
235    /// Return whether a source file description is currently registered in the
236    /// blocking I/O poll.
237    /// This can also be used to check whether a file description is a host
238    /// I/O source.
239    pub fn contains_source(&self, source_id: &FdId) -> bool {
240        self.sources.contains_key(source_id)
241    }
242
243    /// Register a source file description to the blocking I/O poll.
244    pub fn register(&mut self, source_fd: FileDescriptionRef<dyn SourceFileDescription>) {
245        let poll =
246            self.poll.as_ref().expect("Blocking I/O should not be called with isolation enabled");
247
248        let id = source_fd.id();
249        let token = Token(id.to_usize());
250
251        // All possible interests.
252        // We only care about the readable and writable interests because those are the only
253        // interests which are available on all platforms. Internally, mio also
254        // registers an error interest.
255        let interest = Interest::READABLE | Interest::WRITABLE;
256
257        // Treat errors from registering as fatal. On UNIX hosts this can only
258        // fail due to system resource errors (e.g. ENOMEM or ENOSPC) or when the source is already registered.
259        source_fd
260            .with_source(&mut |source| poll.registry().register(source, token, interest))
261            .unwrap();
262
263        let source = BlockingIoSource {
264            fd: FileDescriptionRef::downgrade(&source_fd),
265            blocked_threads: BTreeMap::default(),
266        };
267
268        self.sources
269            .try_insert(id, source)
270            .unwrap_or_else(|_| panic!("Source should not already be registered"));
271    }
272
273    /// Deregister a source file description from the blocking I/O poll.
274    ///
275    /// It's assumed that the file description with id `source_id` is already
276    /// removed from the file description table.
277    pub fn deregister(&mut self, source_id: FdId, source: impl SourceFileDescription) {
278        let poll =
279            self.poll.as_ref().expect("Blocking I/O should not be called with isolation enabled");
280
281        let stored_source = self.sources.remove(&source_id).expect("Source should be registered");
282        // Ensure that the source file description is already removed from the file
283        // description table.
284        assert!(
285            stored_source.fd.upgrade().is_none(),
286            "Sources must only be deregistered when they are destroyed"
287        );
288
289        // Because we only store `WeakFileDescriptionRef`s and the `stored_source` file description
290        // is already destroyed, the weak reference can no longer be upgraded. Thus, we cannot use
291        // it to deregister the source from the poll and instead use the `source` argument to deregister.
292
293        // Treat errors from deregistering as fatal. On UNIX hosts this can only
294        // fail due to system resource errors (e.g. ENOMEM or ENOSPC).
295        source.with_source(&mut |source| poll.registry().deregister(source)).unwrap();
296    }
297
298    /// Add a new blocked thread to a registered source. The thread gets unblocked
299    /// once its [`BlockingIoInterest`] is fulfilled when calling
300    /// [`BlockingIoManager::poll`].
301    ///
302    /// It's assumed that the thread of `thread_id` isn't already blocked on
303    /// the source with id `source_id` and that this source is currently
304    /// registered.
305    fn add_blocked_thread(
306        &mut self,
307        source_id: FdId,
308        thread_id: ThreadId,
309        interest: BlockingIoInterest,
310    ) {
311        let source = self.sources.get_mut(&source_id).expect("Source should be registered");
312
313        source
314            .blocked_threads
315            .try_insert(thread_id, interest)
316            .expect("Thread cannot be blocked multiple times on the same source");
317    }
318
319    /// Remove a blocked thread from a registered source.
320    ///
321    /// It's assumed that the thread of `thread_id` is blocked on the
322    /// source with id `source_id` and that this source is currently
323    /// registered.
324    pub fn remove_blocked_thread(&mut self, source_id: FdId, thread_id: ThreadId) {
325        let source = self.sources.get_mut(&source_id).expect("Source should be registered");
326        source.blocked_threads.remove(&thread_id).expect("Thread should be blocked on source");
327    }
328}
329
330impl<'tcx> EvalContextExt<'tcx> for MiriInterpCx<'tcx> {}
331pub trait EvalContextExt<'tcx>: MiriInterpCxExt<'tcx> {
332    /// Block the current thread until some interests on an I/O source
333    /// are fulfilled or the optional timeout exceeded.
334    /// The callback will be invoked when the thread gets unblocked.
335    ///
336    /// Note that an error interest is implicitly added to `interest`.
337    /// This means that the thread will also be unblocked when the error
338    /// readiness gets set for the source even when the requested interest
339    /// might not be fulfilled.
340    ///
341    /// The callback function will immediately be executed with [`UnblockKind::Ready`]
342    /// when `interest` is already fulfilled for `source_fd`.
343    ///
344    /// There can also be spurious wake-ups by the OS and thus it's the callers
345    /// responsibility to verify that the requested I/O interests are
346    /// really ready and to block again if they're not.
347    ///
348    /// It's the callers responsibility to remove the [`BlockingIoInterest`]
349    /// from the blocking I/O manager in the provided callback function.
350    #[inline]
351    fn block_thread_for_io(
352        &mut self,
353        source_fd: FileDescriptionRef<dyn SourceFileDescription>,
354        interest: BlockingIoInterest,
355        deadline: Option<Deadline>,
356        callback: DynUnblockCallback<'tcx>,
357    ) -> InterpResult<'tcx> {
358        let this = self.eval_context_mut();
359
360        // We always have to do this since the thread will de-register itself.
361        this.machine.blocking_io.add_blocked_thread(source_fd.id(), this.active_thread(), interest);
362
363        if source_fd.get_readiness_mut().fulfills_interest(&interest) {
364            // The requested readiness is currently already fulfilled for the provided source.
365            // Instead of actually blocking the thread, we just run the callback function.
366            callback.call(this, UnblockKind::Ready)
367        } else {
368            // The I/O readiness is currently not fulfilled. We block the thread
369            // until the readiness is fulfilled and execute the callback then.
370            this.block_thread(BlockReason::IO, deadline, callback);
371            interp_ok(())
372        }
373    }
374
375    /// Poll for I/O events until either an I/O event happened or the timeout expired.
376    ///
377    /// - If the timeout is [`Some`] and contains [`Duration::ZERO`], the poll doesn't block and just
378    ///   reads all events since the last poll.
379    /// - If the timeout is [`Some`] and contains a non-zero duration, it blocks at most for the
380    ///   specified duration.
381    /// - If the timeout is [`None`] the poll blocks indefinitely until an event occurs.
382    ///
383    /// Unblocks all threads which are blocked on I/O and whose I/O interests
384    /// are currently fulfilled.
385    fn poll_and_unblock(&mut self, timeout: Option<Duration>) -> InterpResult<'tcx> {
386        let this = self.eval_context_mut();
387
388        match BlockingIoManager::poll(this, timeout)? {
389            Ok(_) => interp_ok(()),
390            // We can ignore errors originating from interrupts; that's just a spurious wakeup.
391            Err(e) if e.kind() == io::ErrorKind::Interrupted => interp_ok(()),
392            // For other errors we panic. On Linux and BSD hosts this should only be
393            // reachable when a system resource error (e.g. ENOMEM or ENOSPC) occurred.
394            Err(e) => panic!("unexpected error while polling: {e}"),
395        }
396    }
397}