miri/concurrency/blocking_io.rs
1use std::cell::RefMut;
2use std::collections::BTreeMap;
3use std::io;
4use std::ops::BitOrAssign;
5use std::time::Duration;
6
7use mio::event::Source;
8use mio::{Events, Interest, Poll, Token};
9
10use crate::shims::{
11 EpollEvalContextExt, FdId, FileDescription, FileDescriptionRef, WeakFileDescriptionRef,
12};
13use crate::*;
14
15/// Capacity of the event queue which can be polled at a time.
16/// Since we don't expect many simultaneous blocking I/O events
17/// this value can be set rather low.
18const IO_EVENT_CAPACITY: usize = 16;
19
20/// Trait for file descriptions that contain a mio [`Source`].
21pub trait SourceFileDescription: FileDescription {
22 /// Invoke `f` on the source inside `self`.
23 fn with_source(&self, f: &mut dyn FnMut(&mut dyn Source) -> io::Result<()>) -> io::Result<()>;
24
25 /// Get a mutable reference to the readiness of the source.
26 fn get_readiness_mut(&self) -> RefMut<'_, BlockingIoSourceReadiness>;
27}
28
29/// An I/O interest for a blocked thread. Note that all threads are always considered
30/// to be interested in "error" events.
31#[derive(Debug, Clone, Copy)]
32pub enum BlockingIoInterest {
33 /// The blocked thread is interested in [`Interest::READABLE`].
34 Read,
35 /// The blocked thread is interested in [`Interest::WRITABLE`].
36 Write,
37 /// The blocked thread is interested in [`Interest::READABLE`] and
38 /// [`Interest::WRITABLE`].
39 ReadWrite,
40}
41
42/// Struct reflecting the readiness of a source file description.
43#[derive(Debug)]
44pub struct BlockingIoSourceReadiness {
45 /// Boolean whether the source is currently readable.
46 pub readable: bool,
47 /// Boolean whether the source is currently writable.
48 pub writable: bool,
49 /// Boolean whether the read end of the source has been
50 /// closed.
51 pub read_closed: bool,
52 /// Boolean whether the write end of the source has been
53 /// closed.
54 pub write_closed: bool,
55 /// Boolean whether the source currently has an error.
56 pub error: bool,
57}
58
59impl BlockingIoSourceReadiness {
60 pub fn empty() -> Self {
61 Self {
62 readable: false,
63 writable: false,
64 read_closed: false,
65 write_closed: false,
66 error: false,
67 }
68 }
69
70 /// Check whether the current readiness fulfills the blocking I/O interest of
71 /// `interest`.
72 /// This function also returns `true` if the error readiness is set
73 /// even when the requested interest might not be fulfilled.
74 fn fulfills_interest(&self, interest: &BlockingIoInterest) -> bool {
75 match interest {
76 BlockingIoInterest::Read => self.readable || self.error,
77 BlockingIoInterest::Write => self.writable || self.error,
78 BlockingIoInterest::ReadWrite => self.readable || self.writable || self.error,
79 }
80 }
81}
82
83impl BitOrAssign for BlockingIoSourceReadiness {
84 fn bitor_assign(&mut self, rhs: Self) {
85 self.readable |= rhs.readable;
86 self.writable |= rhs.writable;
87 self.read_closed |= rhs.read_closed;
88 self.write_closed |= rhs.write_closed;
89 self.error |= rhs.error;
90 }
91}
92
93impl From<&mio::event::Event> for BlockingIoSourceReadiness {
94 fn from(event: &mio::event::Event) -> Self {
95 Self {
96 readable: event.is_readable(),
97 writable: event.is_writable(),
98 read_closed: event.is_read_closed(),
99 write_closed: event.is_write_closed(),
100 error: event.is_error(),
101 }
102 }
103}
104
105struct BlockingIoSource {
106 /// The source file description which is registered into the poll.
107 /// We only store weak references such that source file descriptions
108 /// can be destroyed whilst they are registered. However, they are required
109 /// to deregister themselves when [`FileDescription::destroy`] is called.
110 fd: WeakFileDescriptionRef<dyn SourceFileDescription>,
111 /// The threads which are blocked on the I/O source, and the interest indicating
112 /// when they should be unblocked.
113 blocked_threads: BTreeMap<ThreadId, BlockingIoInterest>,
114}
115
116/// Manager for managing blocking host I/O in a non-blocking manner.
117/// We use [`Poll`] to poll for new I/O events from the OS for sources
118/// registered using this manager.
119///
120/// The semantics of this manager are that host I/O sources are registered
121/// to a [`Poll`] for their entire lifespan. Once host readiness events happen
122/// on a registered source, its internal epoll readiness gets updated -- even
123/// when the source isn't part of an active epoll instance. Also, for the entire
124/// lifespan of the source, threads can be added which should be unblocked
125/// once a certain [`BlockingIoSourceReadiness`] for an I/O source is satisfied.
126///
127/// Since blocking host I/O is inherently non-deterministic, no method on this
128/// manager should be called when isolation is enabled. The only exception is
129/// the [`BlockingIoManager::new`] function to create the manager. Everywhere else,
130/// we assert that isolation is disabled!
131pub struct BlockingIoManager {
132 /// Poll instance to monitor I/O events from the OS.
133 /// This is only [`None`] when Miri is run with isolation enabled.
134 poll: Option<Poll>,
135 /// Buffer used to store the ready I/O events when calling [`Poll::poll`].
136 /// This is not part of the state and only stored to avoid allocating a
137 /// new buffer for every poll.
138 events: Events,
139 /// Map from source file description ids to the actual sources and their
140 /// blocked threads.
141 sources: BTreeMap<FdId, BlockingIoSource>,
142}
143
144impl BlockingIoManager {
145 /// Create a new blocking I/O manager instance based on the availability
146 /// of communication with the host.
147 pub fn new(communicate: bool) -> Result<Self, io::Error> {
148 let manager = Self {
149 poll: communicate.then_some(Poll::new()?),
150 events: Events::with_capacity(IO_EVENT_CAPACITY),
151 sources: BTreeMap::default(),
152 };
153 Ok(manager)
154 }
155
156 /// Poll for new I/O events from the OS or wait until the timeout expired.
157 /// The timeout semantics are the same as described in [`Poll::poll`].
158 /// The events also immediately get processed: threads get unblocked, and epoll readiness gets updated.
159 fn poll<'tcx>(
160 ecx: &mut MiriInterpCx<'tcx>,
161 timeout: Option<Duration>,
162 ) -> InterpResult<'tcx, Result<(), io::Error>> {
163 let poll = ecx
164 .machine
165 .blocking_io
166 .poll
167 .as_mut()
168 .expect("Blocking I/O should not be called with isolation enabled");
169
170 // Poll for new I/O events from OS and store them in the events buffer.
171 if let Err(err) = poll.poll(&mut ecx.machine.blocking_io.events, timeout) {
172 return interp_ok(Err(err));
173 };
174
175 let event_fds = ecx
176 .machine
177 .blocking_io
178 .events
179 .iter()
180 .map(|event| {
181 let token = event.token();
182 // We know all tokens are valid `FdId`.
183 let fd_id = FdId::new_unchecked(token.0);
184 let source = ecx
185 .machine
186 .blocking_io
187 .sources
188 .get(&fd_id)
189 .expect("Source should be registered");
190 let fd = source.fd.upgrade().expect(
191 "Source file description shouldn't be destroyed whilst being registered",
192 );
193
194 assert_eq!(fd.id(), fd_id);
195 // Update the readiness of the source.
196 *fd.get_readiness_mut() |= BlockingIoSourceReadiness::from(event);
197 // Put FD into `event_fds` list.
198 fd
199 })
200 .collect::<Vec<_>>();
201
202 // Update the epoll readiness for all source file descriptions which received an event. Also,
203 // unblock the threads which are blocked on such a source and whose interests are now fulfilled.
204 for fd in event_fds.into_iter() {
205 // Update epoll readiness for the `fd` source.
206 ecx.update_epoll_active_events(fd.clone(), false)?;
207
208 let source =
209 ecx.machine.blocking_io.sources.get(&fd.id()).expect(
210 "Source file description shouldn't be destroyed whilst being registered",
211 );
212
213 // List of all thread id's whose interests are currently fulfilled
214 // and which are blocked on the `fd` source. This also includes
215 // threads whose interests were already fulfilled before the
216 // `poll` invocation.
217 let threads = source
218 .blocked_threads
219 .iter()
220 .filter_map(|(thread_id, interest)| {
221 fd.get_readiness_mut().fulfills_interest(interest).then_some(*thread_id)
222 })
223 .collect::<Vec<_>>();
224
225 // Unblock all threads whose interests are currently fulfilled and
226 // which are blocked on the `fd` source.
227 threads
228 .into_iter()
229 .try_for_each(|thread_id| ecx.unblock_thread(thread_id, BlockReason::IO))?;
230 }
231
232 interp_ok(Ok(()))
233 }
234
235 /// Return whether a source file description is currently registered in the
236 /// blocking I/O poll.
237 /// This can also be used to check whether a file description is a host
238 /// I/O source.
239 pub fn contains_source(&self, source_id: &FdId) -> bool {
240 self.sources.contains_key(source_id)
241 }
242
243 /// Register a source file description to the blocking I/O poll.
244 pub fn register(&mut self, source_fd: FileDescriptionRef<dyn SourceFileDescription>) {
245 let poll =
246 self.poll.as_ref().expect("Blocking I/O should not be called with isolation enabled");
247
248 let id = source_fd.id();
249 let token = Token(id.to_usize());
250
251 // All possible interests.
252 // We only care about the readable and writable interests because those are the only
253 // interests which are available on all platforms. Internally, mio also
254 // registers an error interest.
255 let interest = Interest::READABLE | Interest::WRITABLE;
256
257 // Treat errors from registering as fatal. On UNIX hosts this can only
258 // fail due to system resource errors (e.g. ENOMEM or ENOSPC) or when the source is already registered.
259 source_fd
260 .with_source(&mut |source| poll.registry().register(source, token, interest))
261 .unwrap();
262
263 let source = BlockingIoSource {
264 fd: FileDescriptionRef::downgrade(&source_fd),
265 blocked_threads: BTreeMap::default(),
266 };
267
268 self.sources
269 .try_insert(id, source)
270 .unwrap_or_else(|_| panic!("Source should not already be registered"));
271 }
272
273 /// Deregister a source file description from the blocking I/O poll.
274 ///
275 /// It's assumed that the file description with id `source_id` is already
276 /// removed from the file description table.
277 pub fn deregister(&mut self, source_id: FdId, source: impl SourceFileDescription) {
278 let poll =
279 self.poll.as_ref().expect("Blocking I/O should not be called with isolation enabled");
280
281 let stored_source = self.sources.remove(&source_id).expect("Source should be registered");
282 // Ensure that the source file description is already removed from the file
283 // description table.
284 assert!(
285 stored_source.fd.upgrade().is_none(),
286 "Sources must only be deregistered when they are destroyed"
287 );
288
289 // Because we only store `WeakFileDescriptionRef`s and the `stored_source` file description
290 // is already destroyed, the weak reference can no longer be upgraded. Thus, we cannot use
291 // it to deregister the source from the poll and instead use the `source` argument to deregister.
292
293 // Treat errors from deregistering as fatal. On UNIX hosts this can only
294 // fail due to system resource errors (e.g. ENOMEM or ENOSPC).
295 source.with_source(&mut |source| poll.registry().deregister(source)).unwrap();
296 }
297
298 /// Add a new blocked thread to a registered source. The thread gets unblocked
299 /// once its [`BlockingIoInterest`] is fulfilled when calling
300 /// [`BlockingIoManager::poll`].
301 ///
302 /// It's assumed that the thread of `thread_id` isn't already blocked on
303 /// the source with id `source_id` and that this source is currently
304 /// registered.
305 fn add_blocked_thread(
306 &mut self,
307 source_id: FdId,
308 thread_id: ThreadId,
309 interest: BlockingIoInterest,
310 ) {
311 let source = self.sources.get_mut(&source_id).expect("Source should be registered");
312
313 source
314 .blocked_threads
315 .try_insert(thread_id, interest)
316 .expect("Thread cannot be blocked multiple times on the same source");
317 }
318
319 /// Remove a blocked thread from a registered source.
320 ///
321 /// It's assumed that the thread of `thread_id` is blocked on the
322 /// source with id `source_id` and that this source is currently
323 /// registered.
324 pub fn remove_blocked_thread(&mut self, source_id: FdId, thread_id: ThreadId) {
325 let source = self.sources.get_mut(&source_id).expect("Source should be registered");
326 source.blocked_threads.remove(&thread_id).expect("Thread should be blocked on source");
327 }
328}
329
330impl<'tcx> EvalContextExt<'tcx> for MiriInterpCx<'tcx> {}
331pub trait EvalContextExt<'tcx>: MiriInterpCxExt<'tcx> {
332 /// Block the current thread until some interests on an I/O source
333 /// are fulfilled or the optional timeout exceeded.
334 /// The callback will be invoked when the thread gets unblocked.
335 ///
336 /// Note that an error interest is implicitly added to `interest`.
337 /// This means that the thread will also be unblocked when the error
338 /// readiness gets set for the source even when the requested interest
339 /// might not be fulfilled.
340 ///
341 /// The callback function will immediately be executed with [`UnblockKind::Ready`]
342 /// when `interest` is already fulfilled for `source_fd`.
343 ///
344 /// There can also be spurious wake-ups by the OS and thus it's the callers
345 /// responsibility to verify that the requested I/O interests are
346 /// really ready and to block again if they're not.
347 ///
348 /// It's the callers responsibility to remove the [`BlockingIoInterest`]
349 /// from the blocking I/O manager in the provided callback function.
350 #[inline]
351 fn block_thread_for_io(
352 &mut self,
353 source_fd: FileDescriptionRef<dyn SourceFileDescription>,
354 interest: BlockingIoInterest,
355 deadline: Option<Deadline>,
356 callback: DynUnblockCallback<'tcx>,
357 ) -> InterpResult<'tcx> {
358 let this = self.eval_context_mut();
359
360 // We always have to do this since the thread will de-register itself.
361 this.machine.blocking_io.add_blocked_thread(source_fd.id(), this.active_thread(), interest);
362
363 if source_fd.get_readiness_mut().fulfills_interest(&interest) {
364 // The requested readiness is currently already fulfilled for the provided source.
365 // Instead of actually blocking the thread, we just run the callback function.
366 callback.call(this, UnblockKind::Ready)
367 } else {
368 // The I/O readiness is currently not fulfilled. We block the thread
369 // until the readiness is fulfilled and execute the callback then.
370 this.block_thread(BlockReason::IO, deadline, callback);
371 interp_ok(())
372 }
373 }
374
375 /// Poll for I/O events until either an I/O event happened or the timeout expired.
376 ///
377 /// - If the timeout is [`Some`] and contains [`Duration::ZERO`], the poll doesn't block and just
378 /// reads all events since the last poll.
379 /// - If the timeout is [`Some`] and contains a non-zero duration, it blocks at most for the
380 /// specified duration.
381 /// - If the timeout is [`None`] the poll blocks indefinitely until an event occurs.
382 ///
383 /// Unblocks all threads which are blocked on I/O and whose I/O interests
384 /// are currently fulfilled.
385 fn poll_and_unblock(&mut self, timeout: Option<Duration>) -> InterpResult<'tcx> {
386 let this = self.eval_context_mut();
387
388 match BlockingIoManager::poll(this, timeout)? {
389 Ok(_) => interp_ok(()),
390 // We can ignore errors originating from interrupts; that's just a spurious wakeup.
391 Err(e) if e.kind() == io::ErrorKind::Interrupted => interp_ok(()),
392 // For other errors we panic. On Linux and BSD hosts this should only be
393 // reachable when a system resource error (e.g. ENOMEM or ENOSPC) occurred.
394 Err(e) => panic!("unexpected error while polling: {e}"),
395 }
396 }
397}