rustc_data_structures/
jobserver.rs1use std::sync::{Arc, LazyLock, OnceLock};
2
3pub use jobserver_crate::{Acquired, Client, HelperThread};
4use jobserver_crate::{FromEnv, FromEnvErrorKind};
5use parking_lot::{Condvar, Mutex};
6
7static GLOBAL_CLIENT: LazyLock<Result<Client, String>> = LazyLock::new(|| {
12 let FromEnv { client, var } = unsafe { Client::from_env_ext(true) };
18
19 let error = match client {
20 Ok(client) => return Ok(client),
21 Err(e) => e,
22 };
23
24 if matches!(
25 error.kind(),
26 FromEnvErrorKind::NoEnvVar
27 | FromEnvErrorKind::NoJobserver
28 | FromEnvErrorKind::NegativeFd
29 | FromEnvErrorKind::Unsupported
30 ) {
31 return Ok(default_client());
32 }
33
34 let (name, value) = var.unwrap();
37 Err(format!(
38 "failed to connect to jobserver from environment variable `{name}={:?}`: {error}",
39 value
40 ))
41});
42
43fn default_client() -> Client {
45 let client = Client::new(32).expect("failed to create jobserver");
49
50 client.acquire_raw().ok();
52
53 client
54}
55
56static GLOBAL_CLIENT_CHECKED: OnceLock<Client> = OnceLock::new();
57
58pub fn initialize_checked(report_warning: impl FnOnce(&'static str)) {
59 let client_checked = match &*GLOBAL_CLIENT {
60 Ok(client) => client.clone(),
61 Err(e) => {
62 report_warning(e);
63 default_client()
64 }
65 };
66 GLOBAL_CLIENT_CHECKED.set(client_checked).ok();
67}
68
69const ACCESS_ERROR: &str = "jobserver check should have been called earlier";
70
71pub fn client() -> Client {
72 GLOBAL_CLIENT_CHECKED.get().expect(ACCESS_ERROR).clone()
73}
74
75struct ProxyData {
76 used: u16,
79
80 pending: u16,
82}
83
84pub struct Proxy {
86 client: Client,
87 data: Mutex<ProxyData>,
88
89 wake_pending: Condvar,
91
92 helper: OnceLock<HelperThread>,
93}
94
95impl Proxy {
96 pub fn new() -> Arc<Self> {
97 let proxy = Arc::new(Proxy {
98 client: client(),
99 data: Mutex::new(ProxyData { used: 1, pending: 0 }),
100 wake_pending: Condvar::new(),
101 helper: OnceLock::new(),
102 });
103 let proxy_ = Arc::clone(&proxy);
104 let helper = proxy
105 .client
106 .clone()
107 .into_helper_thread(move |token| {
108 if let Ok(token) = token {
109 let mut data = proxy_.data.lock();
110 if data.pending > 0 {
111 token.drop_without_releasing();
113 assert!(data.used > 0);
114 data.used += 1;
115 data.pending -= 1;
116 proxy_.wake_pending.notify_one();
117 } else {
118 drop(data);
120 drop(token);
121 }
122 }
123 })
124 .expect("failed to create helper thread");
125 proxy.helper.set(helper).unwrap();
126 proxy
127 }
128
129 pub fn acquire_thread(&self) {
130 let mut data = self.data.lock();
131
132 if data.used == 0 {
133 assert_eq!(data.pending, 0);
136 data.used += 1;
137 } else {
138 self.helper.get().unwrap().request_token();
142 data.pending += 1;
143 self.wake_pending.wait(&mut data);
144 }
145 }
146
147 pub fn release_thread(&self) {
148 let mut data = self.data.lock();
149
150 if data.pending > 0 {
151 data.pending -= 1;
153 self.wake_pending.notify_one();
154 } else {
155 data.used -= 1;
156
157 if data.used > 0 {
159 drop(data);
160 self.client.release_raw().ok();
161 }
162 }
163 }
164}