rustc_data_structures/
jobserver.rs

1use std::sync::{Arc, LazyLock, OnceLock};
2
3pub use jobserver_crate::{Acquired, Client, HelperThread};
4use jobserver_crate::{FromEnv, FromEnvErrorKind};
5use parking_lot::{Condvar, Mutex};
6
7// We can only call `from_env_ext` once per process
8
9// We stick this in a global because there could be multiple rustc instances
10// in this process, and the jobserver is per-process.
11static GLOBAL_CLIENT: LazyLock<Result<Client, String>> = LazyLock::new(|| {
12    // Note that this is unsafe because it may misinterpret file descriptors
13    // on Unix as jobserver file descriptors. We hopefully execute this near
14    // the beginning of the process though to ensure we don't get false
15    // positives, or in other words we try to execute this before we open
16    // any file descriptors ourselves.
17    let FromEnv { client, var } = unsafe { Client::from_env_ext(true) };
18
19    let error = match client {
20        Ok(client) => return Ok(client),
21        Err(e) => e,
22    };
23
24    if matches!(
25        error.kind(),
26        FromEnvErrorKind::NoEnvVar
27            | FromEnvErrorKind::NoJobserver
28            | FromEnvErrorKind::NegativeFd
29            | FromEnvErrorKind::Unsupported
30    ) {
31        return Ok(default_client());
32    }
33
34    // Environment specifies jobserver, but it looks incorrect.
35    // Safety: `error.kind()` should be `NoEnvVar` if `var == None`.
36    let (name, value) = var.unwrap();
37    Err(format!(
38        "failed to connect to jobserver from environment variable `{name}={:?}`: {error}",
39        value
40    ))
41});
42
43// Create a new jobserver if there's no inherited one.
44fn default_client() -> Client {
45    // Pick a "reasonable maximum" capping out at 32
46    // so we don't take everything down by hogging the process run queue.
47    // The fixed number is used to have deterministic compilation across machines.
48    let client = Client::new(32).expect("failed to create jobserver");
49
50    // Acquire a token for the main thread which we can release later
51    client.acquire_raw().ok();
52
53    client
54}
55
56static GLOBAL_CLIENT_CHECKED: OnceLock<Client> = OnceLock::new();
57
58pub fn initialize_checked(report_warning: impl FnOnce(&'static str)) {
59    let client_checked = match &*GLOBAL_CLIENT {
60        Ok(client) => client.clone(),
61        Err(e) => {
62            report_warning(e);
63            default_client()
64        }
65    };
66    GLOBAL_CLIENT_CHECKED.set(client_checked).ok();
67}
68
69const ACCESS_ERROR: &str = "jobserver check should have been called earlier";
70
71pub fn client() -> Client {
72    GLOBAL_CLIENT_CHECKED.get().expect(ACCESS_ERROR).clone()
73}
74
75struct ProxyData {
76    /// The number of tokens assigned to threads.
77    /// If this is 0, a single token is still assigned to this process, but is unused.
78    used: u16,
79
80    /// The number of threads requesting a token
81    pending: u16,
82}
83
84/// This is a jobserver proxy used to ensure that we hold on to at least one token.
85pub struct Proxy {
86    client: Client,
87    data: Mutex<ProxyData>,
88
89    /// Threads which are waiting on a token will wait on this.
90    wake_pending: Condvar,
91
92    helper: OnceLock<HelperThread>,
93}
94
95impl Proxy {
96    pub fn new() -> Arc<Self> {
97        let proxy = Arc::new(Proxy {
98            client: client(),
99            data: Mutex::new(ProxyData { used: 1, pending: 0 }),
100            wake_pending: Condvar::new(),
101            helper: OnceLock::new(),
102        });
103        let proxy_ = Arc::clone(&proxy);
104        let helper = proxy
105            .client
106            .clone()
107            .into_helper_thread(move |token| {
108                if let Ok(token) = token {
109                    let mut data = proxy_.data.lock();
110                    if data.pending > 0 {
111                        // Give the token to a waiting thread
112                        token.drop_without_releasing();
113                        assert!(data.used > 0);
114                        data.used += 1;
115                        data.pending -= 1;
116                        proxy_.wake_pending.notify_one();
117                    } else {
118                        // The token is no longer needed, drop it.
119                        drop(data);
120                        drop(token);
121                    }
122                }
123            })
124            .expect("failed to create helper thread");
125        proxy.helper.set(helper).unwrap();
126        proxy
127    }
128
129    pub fn acquire_thread(&self) {
130        let mut data = self.data.lock();
131
132        if data.used == 0 {
133            // There was a free token around. This can
134            // happen when all threads release their token.
135            assert_eq!(data.pending, 0);
136            data.used += 1;
137        } else {
138            // Request a token from the helper thread. We can't directly use `acquire_raw`
139            // as we also need to be able to wait for the final token in the process which
140            // does not get a corresponding `release_raw` call.
141            self.helper.get().unwrap().request_token();
142            data.pending += 1;
143            self.wake_pending.wait(&mut data);
144        }
145    }
146
147    pub fn release_thread(&self) {
148        let mut data = self.data.lock();
149
150        if data.pending > 0 {
151            // Give the token to a waiting thread
152            data.pending -= 1;
153            self.wake_pending.notify_one();
154        } else {
155            data.used -= 1;
156
157            // Release the token unless it's the last one in the process
158            if data.used > 0 {
159                drop(data);
160                self.client.release_raw().ok();
161            }
162        }
163    }
164}