cargo/core/
package.rs

1use std::cell::{Cell, Ref, RefCell, RefMut};
2use std::cmp::Ordering;
3use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
4use std::fmt;
5use std::hash;
6use std::mem;
7use std::path::{Path, PathBuf};
8use std::rc::Rc;
9use std::time::{Duration, Instant};
10
11use anyhow::Context as _;
12use cargo_util_schemas::manifest::{Hints, RustVersion};
13use curl::easy::Easy;
14use curl::multi::{EasyHandle, Multi};
15use lazycell::LazyCell;
16use semver::Version;
17use serde::Serialize;
18use tracing::debug;
19
20use crate::core::compiler::{CompileKind, RustcTargetData};
21use crate::core::dependency::DepKind;
22use crate::core::resolver::features::ForceAllTargets;
23use crate::core::resolver::{HasDevUnits, Resolve};
24use crate::core::{
25    CliUnstable, Dependency, Features, Manifest, PackageId, PackageIdSpec, SerializedDependency,
26    SourceId, Target,
27};
28use crate::core::{Summary, Workspace};
29use crate::sources::source::{MaybePackage, SourceMap};
30use crate::util::HumanBytes;
31use crate::util::cache_lock::{CacheLock, CacheLockMode};
32use crate::util::errors::{CargoResult, HttpNotSuccessful};
33use crate::util::interning::InternedString;
34use crate::util::network::http::HttpTimeout;
35use crate::util::network::http::http_handle_and_timeout;
36use crate::util::network::retry::{Retry, RetryResult};
37use crate::util::network::sleep::SleepTracker;
38use crate::util::{self, GlobalContext, Progress, ProgressStyle, internal};
39
40/// Information about a package that is available somewhere in the file system.
41///
42/// A package is a `Cargo.toml` file plus all the files that are part of it.
43#[derive(Clone)]
44pub struct Package {
45    inner: Rc<PackageInner>,
46}
47
48#[derive(Clone)]
49// TODO: is `manifest_path` a relic?
50struct PackageInner {
51    /// The package's manifest.
52    manifest: Manifest,
53    /// The root of the package.
54    manifest_path: PathBuf,
55}
56
57impl Ord for Package {
58    fn cmp(&self, other: &Package) -> Ordering {
59        self.package_id().cmp(&other.package_id())
60    }
61}
62
63impl PartialOrd for Package {
64    fn partial_cmp(&self, other: &Package) -> Option<Ordering> {
65        Some(self.cmp(other))
66    }
67}
68
69/// A Package in a form where `Serialize` can be derived.
70#[derive(Serialize)]
71pub struct SerializedPackage {
72    name: InternedString,
73    version: Version,
74    id: PackageIdSpec,
75    license: Option<String>,
76    license_file: Option<String>,
77    description: Option<String>,
78    source: SourceId,
79    dependencies: Vec<SerializedDependency>,
80    targets: Vec<Target>,
81    features: BTreeMap<InternedString, Vec<InternedString>>,
82    manifest_path: PathBuf,
83    metadata: Option<toml::Value>,
84    publish: Option<Vec<String>>,
85    authors: Vec<String>,
86    categories: Vec<String>,
87    keywords: Vec<String>,
88    readme: Option<String>,
89    repository: Option<String>,
90    homepage: Option<String>,
91    documentation: Option<String>,
92    edition: String,
93    links: Option<String>,
94    #[serde(skip_serializing_if = "Option::is_none")]
95    metabuild: Option<Vec<String>>,
96    default_run: Option<String>,
97    rust_version: Option<RustVersion>,
98    #[serde(skip_serializing_if = "Option::is_none")]
99    hints: Option<Hints>,
100}
101
102impl Package {
103    /// Creates a package from a manifest and its location.
104    pub fn new(manifest: Manifest, manifest_path: &Path) -> Package {
105        Package {
106            inner: Rc::new(PackageInner {
107                manifest,
108                manifest_path: manifest_path.to_path_buf(),
109            }),
110        }
111    }
112
113    /// Gets the manifest dependencies.
114    pub fn dependencies(&self) -> &[Dependency] {
115        self.manifest().dependencies()
116    }
117    /// Gets the manifest.
118    pub fn manifest(&self) -> &Manifest {
119        &self.inner.manifest
120    }
121    /// Gets the manifest.
122    pub fn manifest_mut(&mut self) -> &mut Manifest {
123        &mut Rc::make_mut(&mut self.inner).manifest
124    }
125    /// Gets the path to the manifest.
126    pub fn manifest_path(&self) -> &Path {
127        &self.inner.manifest_path
128    }
129    /// Gets the name of the package.
130    pub fn name(&self) -> InternedString {
131        self.package_id().name()
132    }
133    /// Gets the `PackageId` object for the package (fully defines a package).
134    pub fn package_id(&self) -> PackageId {
135        self.manifest().package_id()
136    }
137    /// Gets the root folder of the package.
138    pub fn root(&self) -> &Path {
139        self.manifest_path().parent().unwrap()
140    }
141    /// Gets the summary for the package.
142    pub fn summary(&self) -> &Summary {
143        self.manifest().summary()
144    }
145    /// Gets the targets specified in the manifest.
146    pub fn targets(&self) -> &[Target] {
147        self.manifest().targets()
148    }
149    /// Gets the library crate for this package, if it exists.
150    pub fn library(&self) -> Option<&Target> {
151        self.targets().iter().find(|t| t.is_lib())
152    }
153    /// Gets the current package version.
154    pub fn version(&self) -> &Version {
155        self.package_id().version()
156    }
157    /// Gets the package authors.
158    pub fn authors(&self) -> &Vec<String> {
159        &self.manifest().metadata().authors
160    }
161
162    /// Returns `None` if the package is set to publish.
163    /// Returns `Some(allowed_registries)` if publishing is limited to specified
164    /// registries or if package is set to not publish.
165    pub fn publish(&self) -> &Option<Vec<String>> {
166        self.manifest().publish()
167    }
168    /// Returns `true` if this package is a proc-macro.
169    pub fn proc_macro(&self) -> bool {
170        self.targets().iter().any(|target| target.proc_macro())
171    }
172    /// Gets the package's minimum Rust version.
173    pub fn rust_version(&self) -> Option<&RustVersion> {
174        self.manifest().rust_version()
175    }
176
177    /// Gets the package's hints.
178    pub fn hints(&self) -> Option<&Hints> {
179        self.manifest().hints()
180    }
181
182    /// Returns `true` if the package uses a custom build script for any target.
183    pub fn has_custom_build(&self) -> bool {
184        self.targets().iter().any(|t| t.is_custom_build())
185    }
186
187    pub fn map_source(self, to_replace: SourceId, replace_with: SourceId) -> Package {
188        Package {
189            inner: Rc::new(PackageInner {
190                manifest: self.manifest().clone().map_source(to_replace, replace_with),
191                manifest_path: self.manifest_path().to_owned(),
192            }),
193        }
194    }
195
196    pub fn serialized(
197        &self,
198        unstable_flags: &CliUnstable,
199        cargo_features: &Features,
200    ) -> SerializedPackage {
201        let summary = self.manifest().summary();
202        let package_id = summary.package_id();
203        let manmeta = self.manifest().metadata();
204        // Filter out metabuild targets. They are an internal implementation
205        // detail that is probably not relevant externally. There's also not a
206        // real path to show in `src_path`, and this avoids changing the format.
207        let targets: Vec<Target> = self
208            .manifest()
209            .targets()
210            .iter()
211            .filter(|t| t.src_path().is_path())
212            .cloned()
213            .collect();
214        // Convert Vec<FeatureValue> to Vec<InternedString>
215        let crate_features = summary
216            .features()
217            .iter()
218            .map(|(k, v)| (*k, v.iter().map(|fv| fv.to_string().into()).collect()))
219            .collect();
220
221        SerializedPackage {
222            name: package_id.name(),
223            version: package_id.version().clone(),
224            id: package_id.to_spec(),
225            license: manmeta.license.clone(),
226            license_file: manmeta.license_file.clone(),
227            description: manmeta.description.clone(),
228            source: summary.source_id(),
229            dependencies: summary
230                .dependencies()
231                .iter()
232                .map(|dep| dep.serialized(unstable_flags, cargo_features))
233                .collect(),
234            targets,
235            features: crate_features,
236            manifest_path: self.manifest_path().to_path_buf(),
237            metadata: self.manifest().custom_metadata().cloned(),
238            authors: manmeta.authors.clone(),
239            categories: manmeta.categories.clone(),
240            keywords: manmeta.keywords.clone(),
241            readme: manmeta.readme.clone(),
242            repository: manmeta.repository.clone(),
243            homepage: manmeta.homepage.clone(),
244            documentation: manmeta.documentation.clone(),
245            edition: self.manifest().edition().to_string(),
246            links: self.manifest().links().map(|s| s.to_owned()),
247            metabuild: self.manifest().metabuild().cloned(),
248            publish: self.publish().as_ref().cloned(),
249            default_run: self.manifest().default_run().map(|s| s.to_owned()),
250            rust_version: self.rust_version().cloned(),
251            hints: self.hints().cloned(),
252        }
253    }
254}
255
256impl fmt::Display for Package {
257    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258        write!(f, "{}", self.summary().package_id())
259    }
260}
261
262impl fmt::Debug for Package {
263    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
264        f.debug_struct("Package")
265            .field("id", &self.summary().package_id())
266            .field("..", &"..")
267            .finish()
268    }
269}
270
271impl PartialEq for Package {
272    fn eq(&self, other: &Package) -> bool {
273        self.package_id() == other.package_id()
274    }
275}
276
277impl Eq for Package {}
278
279impl hash::Hash for Package {
280    fn hash<H: hash::Hasher>(&self, into: &mut H) {
281        self.package_id().hash(into)
282    }
283}
284
285/// A set of packages, with the intent to download.
286///
287/// This is primarily used to convert a set of `PackageId`s to `Package`s. It
288/// will download as needed, or used the cached download if available.
289pub struct PackageSet<'gctx> {
290    packages: HashMap<PackageId, LazyCell<Package>>,
291    sources: RefCell<SourceMap<'gctx>>,
292    gctx: &'gctx GlobalContext,
293    multi: Multi,
294    /// Used to prevent reusing the `PackageSet` to download twice.
295    downloading: Cell<bool>,
296    /// Whether or not to use curl HTTP/2 multiplexing.
297    multiplexing: bool,
298}
299
300/// Helper for downloading crates.
301pub struct Downloads<'a, 'gctx> {
302    set: &'a PackageSet<'gctx>,
303    /// When a download is started, it is added to this map. The key is a
304    /// "token" (see `Download::token`). It is removed once the download is
305    /// finished.
306    pending: HashMap<usize, (Download<'gctx>, EasyHandle)>,
307    /// Set of packages currently being downloaded. This should stay in sync
308    /// with `pending`.
309    pending_ids: HashSet<PackageId>,
310    /// Downloads that have failed and are waiting to retry again later.
311    sleeping: SleepTracker<(Download<'gctx>, Easy)>,
312    /// The final result of each download. A pair `(token, result)`. This is a
313    /// temporary holding area, needed because curl can report multiple
314    /// downloads at once, but the main loop (`wait`) is written to only
315    /// handle one at a time.
316    results: Vec<(usize, Result<(), curl::Error>)>,
317    /// The next ID to use for creating a token (see `Download::token`).
318    next: usize,
319    /// Progress bar.
320    progress: RefCell<Option<Progress<'gctx>>>,
321    /// Number of downloads that have successfully finished.
322    downloads_finished: usize,
323    /// Total bytes for all successfully downloaded packages.
324    downloaded_bytes: u64,
325    /// Size (in bytes) and package name of the largest downloaded package.
326    largest: (u64, InternedString),
327    /// Time when downloading started.
328    start: Instant,
329    /// Indicates *all* downloads were successful.
330    success: bool,
331
332    /// Timeout management, both of timeout thresholds as well as whether or not
333    /// our connection has timed out (and accompanying message if it has).
334    ///
335    /// Note that timeout management is done manually here instead of in libcurl
336    /// because we want to apply timeouts to an entire batch of operations, not
337    /// any one particular single operation.
338    timeout: HttpTimeout,
339    /// Last time bytes were received.
340    updated_at: Cell<Instant>,
341    /// This is a slow-speed check. It is reset to `now + timeout_duration`
342    /// every time at least `threshold` bytes are received. If the current
343    /// time ever exceeds `next_speed_check`, then give up and report a
344    /// timeout error.
345    next_speed_check: Cell<Instant>,
346    /// This is the slow-speed threshold byte count. It starts at the
347    /// configured threshold value (default 10), and is decremented by the
348    /// number of bytes received in each chunk. If it is <= zero, the
349    /// threshold has been met and data is being received fast enough not to
350    /// trigger a timeout; reset `next_speed_check` and set this back to the
351    /// configured threshold.
352    next_speed_check_bytes_threshold: Cell<u64>,
353    /// Global filesystem lock to ensure only one Cargo is downloading at a
354    /// time.
355    _lock: CacheLock<'gctx>,
356}
357
358struct Download<'gctx> {
359    /// The token for this download, used as the key of the `Downloads::pending` map
360    /// and stored in `EasyHandle` as well.
361    token: usize,
362
363    /// The package that we're downloading.
364    id: PackageId,
365
366    /// Actual downloaded data, updated throughout the lifetime of this download.
367    data: RefCell<Vec<u8>>,
368
369    /// HTTP headers for debugging.
370    headers: RefCell<Vec<String>>,
371
372    /// The URL that we're downloading from, cached here for error messages and
373    /// reenqueuing.
374    url: String,
375
376    /// A descriptive string to print when we've finished downloading this crate.
377    descriptor: String,
378
379    /// Statistics updated from the progress callback in libcurl.
380    total: Cell<u64>,
381    current: Cell<u64>,
382
383    /// The moment we started this transfer at.
384    start: Instant,
385    timed_out: Cell<Option<String>>,
386
387    /// Logic used to track retrying this download if it's a spurious failure.
388    retry: Retry<'gctx>,
389}
390
391impl<'gctx> PackageSet<'gctx> {
392    pub fn new(
393        package_ids: &[PackageId],
394        sources: SourceMap<'gctx>,
395        gctx: &'gctx GlobalContext,
396    ) -> CargoResult<PackageSet<'gctx>> {
397        // We've enabled the `http2` feature of `curl` in Cargo, so treat
398        // failures here as fatal as it would indicate a build-time problem.
399        let mut multi = Multi::new();
400        let multiplexing = gctx.http_config()?.multiplexing.unwrap_or(true);
401        multi
402            .pipelining(false, multiplexing)
403            .context("failed to enable multiplexing/pipelining in curl")?;
404
405        // let's not flood crates.io with connections
406        multi.set_max_host_connections(2)?;
407
408        Ok(PackageSet {
409            packages: package_ids
410                .iter()
411                .map(|&id| (id, LazyCell::new()))
412                .collect(),
413            sources: RefCell::new(sources),
414            gctx,
415            multi,
416            downloading: Cell::new(false),
417            multiplexing,
418        })
419    }
420
421    pub fn package_ids(&self) -> impl Iterator<Item = PackageId> + '_ {
422        self.packages.keys().cloned()
423    }
424
425    pub fn packages(&self) -> impl Iterator<Item = &Package> {
426        self.packages.values().filter_map(|p| p.borrow())
427    }
428
429    pub fn enable_download<'a>(&'a self) -> CargoResult<Downloads<'a, 'gctx>> {
430        assert!(!self.downloading.replace(true));
431        let timeout = HttpTimeout::new(self.gctx)?;
432        Ok(Downloads {
433            start: Instant::now(),
434            set: self,
435            next: 0,
436            pending: HashMap::new(),
437            pending_ids: HashSet::new(),
438            sleeping: SleepTracker::new(),
439            results: Vec::new(),
440            progress: RefCell::new(Some(Progress::with_style(
441                "Downloading",
442                ProgressStyle::Ratio,
443                self.gctx,
444            ))),
445            downloads_finished: 0,
446            downloaded_bytes: 0,
447            largest: (0, "".into()),
448            success: false,
449            updated_at: Cell::new(Instant::now()),
450            timeout,
451            next_speed_check: Cell::new(Instant::now()),
452            next_speed_check_bytes_threshold: Cell::new(0),
453            _lock: self
454                .gctx
455                .acquire_package_cache_lock(CacheLockMode::DownloadExclusive)?,
456        })
457    }
458
459    pub fn get_one(&self, id: PackageId) -> CargoResult<&Package> {
460        if let Some(pkg) = self.packages.get(&id).and_then(|slot| slot.borrow()) {
461            return Ok(pkg);
462        }
463        Ok(self.get_many(Some(id))?.remove(0))
464    }
465
466    pub fn get_many(&self, ids: impl IntoIterator<Item = PackageId>) -> CargoResult<Vec<&Package>> {
467        let mut pkgs = Vec::new();
468        let _lock = self
469            .gctx
470            .acquire_package_cache_lock(CacheLockMode::DownloadExclusive)?;
471        let mut downloads = self.enable_download()?;
472        for id in ids {
473            pkgs.extend(downloads.start(id)?);
474        }
475        while downloads.remaining() > 0 {
476            pkgs.push(downloads.wait()?);
477        }
478        downloads.success = true;
479        drop(downloads);
480
481        let mut deferred = self.gctx.deferred_global_last_use()?;
482        deferred.save_no_error(self.gctx);
483        Ok(pkgs)
484    }
485
486    /// Downloads any packages accessible from the give root ids.
487    #[tracing::instrument(skip_all)]
488    pub fn download_accessible(
489        &self,
490        resolve: &Resolve,
491        root_ids: &[PackageId],
492        has_dev_units: HasDevUnits,
493        requested_kinds: &[CompileKind],
494        target_data: &RustcTargetData<'gctx>,
495        force_all_targets: ForceAllTargets,
496    ) -> CargoResult<()> {
497        fn collect_used_deps(
498            used: &mut BTreeSet<(PackageId, CompileKind)>,
499            resolve: &Resolve,
500            pkg_id: PackageId,
501            has_dev_units: HasDevUnits,
502            requested_kind: CompileKind,
503            target_data: &RustcTargetData<'_>,
504            force_all_targets: ForceAllTargets,
505        ) -> CargoResult<()> {
506            if !used.insert((pkg_id, requested_kind)) {
507                return Ok(());
508            }
509            let requested_kinds = &[requested_kind];
510            let filtered_deps = PackageSet::filter_deps(
511                pkg_id,
512                resolve,
513                has_dev_units,
514                requested_kinds,
515                target_data,
516                force_all_targets,
517            );
518            for (pkg_id, deps) in filtered_deps {
519                collect_used_deps(
520                    used,
521                    resolve,
522                    pkg_id,
523                    has_dev_units,
524                    requested_kind,
525                    target_data,
526                    force_all_targets,
527                )?;
528                let artifact_kinds = deps.iter().filter_map(|dep| {
529                    Some(
530                        dep.artifact()?
531                            .target()?
532                            .to_resolved_compile_kind(*requested_kinds.iter().next().unwrap()),
533                    )
534                });
535                for artifact_kind in artifact_kinds {
536                    collect_used_deps(
537                        used,
538                        resolve,
539                        pkg_id,
540                        has_dev_units,
541                        artifact_kind,
542                        target_data,
543                        force_all_targets,
544                    )?;
545                }
546            }
547            Ok(())
548        }
549
550        // This is sorted by PackageId to get consistent behavior and error
551        // messages for Cargo's testsuite. Perhaps there is a better ordering
552        // that optimizes download time?
553        let mut to_download = BTreeSet::new();
554
555        for id in root_ids {
556            for requested_kind in requested_kinds {
557                collect_used_deps(
558                    &mut to_download,
559                    resolve,
560                    *id,
561                    has_dev_units,
562                    *requested_kind,
563                    target_data,
564                    force_all_targets,
565                )?;
566            }
567        }
568        let to_download = to_download
569            .into_iter()
570            .map(|(p, _)| p)
571            .collect::<BTreeSet<_>>();
572        self.get_many(to_download.into_iter())?;
573        Ok(())
574    }
575
576    /// Check if there are any dependency packages that violate artifact constraints
577    /// to instantly abort, or that do not have any libs which results in warnings.
578    pub(crate) fn warn_no_lib_packages_and_artifact_libs_overlapping_deps(
579        &self,
580        ws: &Workspace<'gctx>,
581        resolve: &Resolve,
582        root_ids: &[PackageId],
583        has_dev_units: HasDevUnits,
584        requested_kinds: &[CompileKind],
585        target_data: &RustcTargetData<'_>,
586        force_all_targets: ForceAllTargets,
587    ) -> CargoResult<()> {
588        let no_lib_pkgs: BTreeMap<PackageId, Vec<(&Package, &HashSet<Dependency>)>> = root_ids
589            .iter()
590            .map(|&root_id| {
591                let dep_pkgs_to_deps: Vec<_> = PackageSet::filter_deps(
592                    root_id,
593                    resolve,
594                    has_dev_units,
595                    requested_kinds,
596                    target_data,
597                    force_all_targets,
598                )
599                .collect();
600
601                let dep_pkgs_and_deps = dep_pkgs_to_deps
602                    .into_iter()
603                    .filter(|(_id, deps)| deps.iter().any(|dep| dep.maybe_lib()))
604                    .filter_map(|(dep_package_id, deps)| {
605                        self.get_one(dep_package_id).ok().and_then(|dep_pkg| {
606                            (!dep_pkg.targets().iter().any(|t| t.is_lib())).then(|| (dep_pkg, deps))
607                        })
608                    })
609                    .collect();
610                (root_id, dep_pkgs_and_deps)
611            })
612            .collect();
613
614        for (pkg_id, dep_pkgs) in no_lib_pkgs {
615            for (_dep_pkg_without_lib_target, deps) in dep_pkgs {
616                for dep in deps.iter().filter(|dep| {
617                    dep.artifact()
618                        .map(|artifact| artifact.is_lib())
619                        .unwrap_or(true)
620                }) {
621                    ws.gctx().shell().warn(&format!(
622                        "{} ignoring invalid dependency `{}` which is missing a lib target",
623                        pkg_id,
624                        dep.name_in_toml(),
625                    ))?;
626                }
627            }
628        }
629        Ok(())
630    }
631
632    fn filter_deps<'a>(
633        pkg_id: PackageId,
634        resolve: &'a Resolve,
635        has_dev_units: HasDevUnits,
636        requested_kinds: &'a [CompileKind],
637        target_data: &'a RustcTargetData<'_>,
638        force_all_targets: ForceAllTargets,
639    ) -> impl Iterator<Item = (PackageId, &'a HashSet<Dependency>)> + 'a {
640        resolve
641            .deps(pkg_id)
642            .filter(move |&(_id, deps)| {
643                deps.iter().any(|dep| {
644                    if dep.kind() == DepKind::Development && has_dev_units == HasDevUnits::No {
645                        return false;
646                    }
647                    if force_all_targets == ForceAllTargets::No {
648                        let activated = requested_kinds
649                            .iter()
650                            .chain(Some(&CompileKind::Host))
651                            .any(|kind| target_data.dep_platform_activated(dep, *kind));
652                        if !activated {
653                            return false;
654                        }
655                    }
656                    true
657                })
658            })
659            .into_iter()
660    }
661
662    pub fn sources(&self) -> Ref<'_, SourceMap<'gctx>> {
663        self.sources.borrow()
664    }
665
666    pub fn sources_mut(&self) -> RefMut<'_, SourceMap<'gctx>> {
667        self.sources.borrow_mut()
668    }
669
670    /// Merge the given set into self.
671    pub fn add_set(&mut self, set: PackageSet<'gctx>) {
672        assert!(!self.downloading.get());
673        assert!(!set.downloading.get());
674        for (pkg_id, p_cell) in set.packages {
675            self.packages.entry(pkg_id).or_insert(p_cell);
676        }
677        let mut sources = self.sources.borrow_mut();
678        let other_sources = set.sources.into_inner();
679        sources.add_source_map(other_sources);
680    }
681}
682
683impl<'a, 'gctx> Downloads<'a, 'gctx> {
684    /// Starts to download the package for the `id` specified.
685    ///
686    /// Returns `None` if the package is queued up for download and will
687    /// eventually be returned from `wait_for_download`. Returns `Some(pkg)` if
688    /// the package is ready and doesn't need to be downloaded.
689    #[tracing::instrument(skip_all)]
690    pub fn start(&mut self, id: PackageId) -> CargoResult<Option<&'a Package>> {
691        self.start_inner(id)
692            .with_context(|| format!("failed to download `{}`", id))
693    }
694
695    fn start_inner(&mut self, id: PackageId) -> CargoResult<Option<&'a Package>> {
696        // First up see if we've already cached this package, in which case
697        // there's nothing to do.
698        let slot = self
699            .set
700            .packages
701            .get(&id)
702            .ok_or_else(|| internal(format!("couldn't find `{}` in package set", id)))?;
703        if let Some(pkg) = slot.borrow() {
704            return Ok(Some(pkg));
705        }
706
707        // Ask the original source for this `PackageId` for the corresponding
708        // package. That may immediately come back and tell us that the package
709        // is ready, or it could tell us that it needs to be downloaded.
710        let mut sources = self.set.sources.borrow_mut();
711        let source = sources
712            .get_mut(id.source_id())
713            .ok_or_else(|| internal(format!("couldn't find source for `{}`", id)))?;
714        let pkg = source
715            .download(id)
716            .context("unable to get packages from source")?;
717        let (url, descriptor, authorization) = match pkg {
718            MaybePackage::Ready(pkg) => {
719                debug!("{} doesn't need a download", id);
720                assert!(slot.fill(pkg).is_ok());
721                return Ok(Some(slot.borrow().unwrap()));
722            }
723            MaybePackage::Download {
724                url,
725                descriptor,
726                authorization,
727            } => (url, descriptor, authorization),
728        };
729
730        // Ok we're going to download this crate, so let's set up all our
731        // internal state and hand off an `Easy` handle to our libcurl `Multi`
732        // handle. This won't actually start the transfer, but later it'll
733        // happen during `wait_for_download`
734        let token = self.next;
735        self.next += 1;
736        debug!(target: "network", "downloading {} as {}", id, token);
737        assert!(self.pending_ids.insert(id));
738
739        let (mut handle, _timeout) = http_handle_and_timeout(self.set.gctx)?;
740        handle.get(true)?;
741        handle.url(&url)?;
742        handle.follow_location(true)?; // follow redirects
743
744        // Add authorization header.
745        if let Some(authorization) = authorization {
746            let mut headers = curl::easy::List::new();
747            headers.append(&format!("Authorization: {}", authorization))?;
748            handle.http_headers(headers)?;
749        }
750
751        // Enable HTTP/2 if possible.
752        crate::try_old_curl_http2_pipewait!(self.set.multiplexing, handle);
753
754        handle.write_function(move |buf| {
755            debug!(target: "network", "{} - {} bytes of data", token, buf.len());
756            tls::with(|downloads| {
757                if let Some(downloads) = downloads {
758                    downloads.pending[&token]
759                        .0
760                        .data
761                        .borrow_mut()
762                        .extend_from_slice(buf);
763                }
764            });
765            Ok(buf.len())
766        })?;
767        handle.header_function(move |data| {
768            tls::with(|downloads| {
769                if let Some(downloads) = downloads {
770                    // Headers contain trailing \r\n, trim them to make it easier
771                    // to work with.
772                    let h = String::from_utf8_lossy(data).trim().to_string();
773                    downloads.pending[&token].0.headers.borrow_mut().push(h);
774                }
775            });
776            true
777        })?;
778
779        handle.progress(true)?;
780        handle.progress_function(move |dl_total, dl_cur, _, _| {
781            tls::with(|downloads| match downloads {
782                Some(d) => d.progress(token, dl_total as u64, dl_cur as u64),
783                None => false,
784            })
785        })?;
786
787        // If the progress bar isn't enabled then it may be awhile before the
788        // first crate finishes downloading so we inform immediately that we're
789        // downloading crates here.
790        if self.downloads_finished == 0
791            && self.pending.is_empty()
792            && !self.progress.borrow().as_ref().unwrap().is_enabled()
793        {
794            self.set.gctx.shell().status("Downloading", "crates ...")?;
795        }
796
797        let dl = Download {
798            token,
799            data: RefCell::new(Vec::new()),
800            headers: RefCell::new(Vec::new()),
801            id,
802            url,
803            descriptor,
804            total: Cell::new(0),
805            current: Cell::new(0),
806            start: Instant::now(),
807            timed_out: Cell::new(None),
808            retry: Retry::new(self.set.gctx)?,
809        };
810        self.enqueue(dl, handle)?;
811        self.tick(WhyTick::DownloadStarted)?;
812
813        Ok(None)
814    }
815
816    /// Returns the number of crates that are still downloading.
817    pub fn remaining(&self) -> usize {
818        self.pending.len() + self.sleeping.len()
819    }
820
821    /// Blocks the current thread waiting for a package to finish downloading.
822    ///
823    /// This method will wait for a previously enqueued package to finish
824    /// downloading and return a reference to it after it's done downloading.
825    ///
826    /// # Panics
827    ///
828    /// This function will panic if there are no remaining downloads.
829    #[tracing::instrument(skip_all)]
830    pub fn wait(&mut self) -> CargoResult<&'a Package> {
831        let (dl, data) = loop {
832            assert_eq!(self.pending.len(), self.pending_ids.len());
833            let (token, result) = self.wait_for_curl()?;
834            debug!(target: "network", "{} finished with {:?}", token, result);
835
836            let (mut dl, handle) = self
837                .pending
838                .remove(&token)
839                .expect("got a token for a non-in-progress transfer");
840            let data = mem::take(&mut *dl.data.borrow_mut());
841            let headers = mem::take(&mut *dl.headers.borrow_mut());
842            let mut handle = self.set.multi.remove(handle)?;
843            self.pending_ids.remove(&dl.id);
844
845            // Check if this was a spurious error. If it was a spurious error
846            // then we want to re-enqueue our request for another attempt and
847            // then we wait for another request to finish.
848            let ret = {
849                let timed_out = &dl.timed_out;
850                let url = &dl.url;
851                dl.retry.r#try(|| {
852                    if let Err(e) = result {
853                        // If this error is "aborted by callback" then that's
854                        // probably because our progress callback aborted due to
855                        // a timeout. We'll find out by looking at the
856                        // `timed_out` field, looking for a descriptive message.
857                        // If one is found we switch the error code (to ensure
858                        // it's flagged as spurious) and then attach our extra
859                        // information to the error.
860                        if !e.is_aborted_by_callback() {
861                            return Err(e.into());
862                        }
863
864                        return Err(match timed_out.replace(None) {
865                            Some(msg) => {
866                                let code = curl_sys::CURLE_OPERATION_TIMEDOUT;
867                                let mut err = curl::Error::new(code);
868                                err.set_extra(msg);
869                                err
870                            }
871                            None => e,
872                        }
873                        .into());
874                    }
875
876                    let code = handle.response_code()?;
877                    if code != 200 && code != 0 {
878                        return Err(HttpNotSuccessful::new_from_handle(
879                            &mut handle,
880                            &url,
881                            data,
882                            headers,
883                        )
884                        .into());
885                    }
886                    Ok(data)
887                })
888            };
889            match ret {
890                RetryResult::Success(data) => break (dl, data),
891                RetryResult::Err(e) => {
892                    return Err(e.context(format!("failed to download from `{}`", dl.url)));
893                }
894                RetryResult::Retry(sleep) => {
895                    debug!(target: "network", "download retry {} for {sleep}ms", dl.url);
896                    self.sleeping.push(sleep, (dl, handle));
897                }
898            }
899        };
900
901        // If the progress bar isn't enabled then we still want to provide some
902        // semblance of progress of how we're downloading crates, and if the
903        // progress bar is enabled this provides a good log of what's happening.
904        self.progress.borrow_mut().as_mut().unwrap().clear();
905        self.set.gctx.shell().status("Downloaded", &dl.descriptor)?;
906
907        self.downloads_finished += 1;
908        self.downloaded_bytes += dl.total.get();
909        if dl.total.get() > self.largest.0 {
910            self.largest = (dl.total.get(), dl.id.name());
911        }
912
913        // We're about to synchronously extract the crate below. While we're
914        // doing that our download progress won't actually be updated, nor do we
915        // have a great view into the progress of the extraction. Let's prepare
916        // the user for this CPU-heavy step if it looks like it'll take some
917        // time to do so.
918        let kib_400 = 1024 * 400;
919        if dl.total.get() < kib_400 {
920            self.tick(WhyTick::DownloadFinished)?;
921        } else {
922            self.tick(WhyTick::Extracting(&dl.id.name()))?;
923        }
924
925        // Inform the original source that the download is finished which
926        // should allow us to actually get the package and fill it in now.
927        let mut sources = self.set.sources.borrow_mut();
928        let source = sources
929            .get_mut(dl.id.source_id())
930            .ok_or_else(|| internal(format!("couldn't find source for `{}`", dl.id)))?;
931        let start = Instant::now();
932        let pkg = source.finish_download(dl.id, data)?;
933
934        // Assume that no time has passed while we were calling
935        // `finish_download`, update all speed checks and timeout limits of all
936        // active downloads to make sure they don't fire because of a slowly
937        // extracted tarball.
938        let finish_dur = start.elapsed();
939        self.updated_at.set(self.updated_at.get() + finish_dur);
940        self.next_speed_check
941            .set(self.next_speed_check.get() + finish_dur);
942
943        let slot = &self.set.packages[&dl.id];
944        assert!(slot.fill(pkg).is_ok());
945        Ok(slot.borrow().unwrap())
946    }
947
948    fn enqueue(&mut self, dl: Download<'gctx>, handle: Easy) -> CargoResult<()> {
949        let mut handle = self.set.multi.add(handle)?;
950        let now = Instant::now();
951        handle.set_token(dl.token)?;
952        self.updated_at.set(now);
953        self.next_speed_check.set(now + self.timeout.dur);
954        self.next_speed_check_bytes_threshold
955            .set(u64::from(self.timeout.low_speed_limit));
956        dl.timed_out.set(None);
957        dl.current.set(0);
958        dl.total.set(0);
959        self.pending.insert(dl.token, (dl, handle));
960        Ok(())
961    }
962
963    /// Block, waiting for curl. Returns a token and a `Result` for that token
964    /// (`Ok` means the download successfully finished).
965    fn wait_for_curl(&mut self) -> CargoResult<(usize, Result<(), curl::Error>)> {
966        // This is the main workhorse loop. We use libcurl's portable `wait`
967        // method to actually perform blocking. This isn't necessarily too
968        // efficient in terms of fd management, but we should only be juggling
969        // a few anyway.
970        //
971        // Here we start off by asking the `multi` handle to do some work via
972        // the `perform` method. This will actually do I/O work (non-blocking)
973        // and attempt to make progress. Afterwards we ask about the `messages`
974        // contained in the handle which will inform us if anything has finished
975        // transferring.
976        //
977        // If we've got a finished transfer after all that work we break out
978        // and process the finished transfer at the end. Otherwise we need to
979        // actually block waiting for I/O to happen, which we achieve with the
980        // `wait` method on `multi`.
981        loop {
982            self.add_sleepers()?;
983            let n = tls::set(self, || {
984                self.set
985                    .multi
986                    .perform()
987                    .context("failed to perform http requests")
988            })?;
989            debug!(target: "network", "handles remaining: {}", n);
990            let results = &mut self.results;
991            let pending = &self.pending;
992            self.set.multi.messages(|msg| {
993                let token = msg.token().expect("failed to read token");
994                let handle = &pending[&token].1;
995                if let Some(result) = msg.result_for(handle) {
996                    results.push((token, result));
997                } else {
998                    debug!(target: "network", "message without a result (?)");
999                }
1000            });
1001
1002            if let Some(pair) = results.pop() {
1003                break Ok(pair);
1004            }
1005            assert_ne!(self.remaining(), 0);
1006            if self.pending.is_empty() {
1007                let delay = self.sleeping.time_to_next().unwrap();
1008                debug!(target: "network", "sleeping main thread for {delay:?}");
1009                std::thread::sleep(delay);
1010            } else {
1011                let min_timeout = Duration::new(1, 0);
1012                let timeout = self.set.multi.get_timeout()?.unwrap_or(min_timeout);
1013                let timeout = timeout.min(min_timeout);
1014                self.set
1015                    .multi
1016                    .wait(&mut [], timeout)
1017                    .context("failed to wait on curl `Multi`")?;
1018            }
1019        }
1020    }
1021
1022    fn add_sleepers(&mut self) -> CargoResult<()> {
1023        for (dl, handle) in self.sleeping.to_retry() {
1024            self.pending_ids.insert(dl.id);
1025            self.enqueue(dl, handle)?;
1026        }
1027        Ok(())
1028    }
1029
1030    fn progress(&self, token: usize, total: u64, cur: u64) -> bool {
1031        let dl = &self.pending[&token].0;
1032        dl.total.set(total);
1033        let now = Instant::now();
1034        if cur > dl.current.get() {
1035            let delta = cur - dl.current.get();
1036            let threshold = self.next_speed_check_bytes_threshold.get();
1037
1038            dl.current.set(cur);
1039            self.updated_at.set(now);
1040
1041            if delta >= threshold {
1042                self.next_speed_check.set(now + self.timeout.dur);
1043                self.next_speed_check_bytes_threshold
1044                    .set(u64::from(self.timeout.low_speed_limit));
1045            } else {
1046                self.next_speed_check_bytes_threshold.set(threshold - delta);
1047            }
1048        }
1049        if self.tick(WhyTick::DownloadUpdate).is_err() {
1050            return false;
1051        }
1052
1053        // If we've spent too long not actually receiving any data we time out.
1054        if now > self.updated_at.get() + self.timeout.dur {
1055            self.updated_at.set(now);
1056            let msg = format!(
1057                "failed to download any data for `{}` within {}s",
1058                dl.id,
1059                self.timeout.dur.as_secs()
1060            );
1061            dl.timed_out.set(Some(msg));
1062            return false;
1063        }
1064
1065        // If we reached the point in time that we need to check our speed
1066        // limit, see if we've transferred enough data during this threshold. If
1067        // it fails this check then we fail because the download is going too
1068        // slowly.
1069        if now >= self.next_speed_check.get() {
1070            self.next_speed_check.set(now + self.timeout.dur);
1071            assert!(self.next_speed_check_bytes_threshold.get() > 0);
1072            let msg = format!(
1073                "download of `{}` failed to transfer more \
1074                 than {} bytes in {}s",
1075                dl.id,
1076                self.timeout.low_speed_limit,
1077                self.timeout.dur.as_secs()
1078            );
1079            dl.timed_out.set(Some(msg));
1080            return false;
1081        }
1082
1083        true
1084    }
1085
1086    fn tick(&self, why: WhyTick<'_>) -> CargoResult<()> {
1087        let mut progress = self.progress.borrow_mut();
1088        let progress = progress.as_mut().unwrap();
1089
1090        if let WhyTick::DownloadUpdate = why {
1091            if !progress.update_allowed() {
1092                return Ok(());
1093            }
1094        }
1095        let pending = self.remaining();
1096        let mut msg = if pending == 1 {
1097            format!("{} crate", pending)
1098        } else {
1099            format!("{} crates", pending)
1100        };
1101        match why {
1102            WhyTick::Extracting(krate) => {
1103                msg.push_str(&format!(", extracting {} ...", krate));
1104            }
1105            _ => {
1106                let mut dur = Duration::new(0, 0);
1107                let mut remaining = 0;
1108                for (dl, _) in self.pending.values() {
1109                    dur += dl.start.elapsed();
1110                    // If the total/current look weird just throw out the data
1111                    // point, sounds like curl has more to learn before we have
1112                    // the true information.
1113                    if dl.total.get() >= dl.current.get() {
1114                        remaining += dl.total.get() - dl.current.get();
1115                    }
1116                }
1117                if remaining > 0 && dur > Duration::from_millis(500) {
1118                    msg.push_str(&format!(", remaining bytes: {:.1}", HumanBytes(remaining)));
1119                }
1120            }
1121        }
1122        progress.print_now(&msg)
1123    }
1124}
1125
1126#[derive(Copy, Clone)]
1127enum WhyTick<'a> {
1128    DownloadStarted,
1129    DownloadUpdate,
1130    DownloadFinished,
1131    Extracting(&'a str),
1132}
1133
1134impl<'a, 'gctx> Drop for Downloads<'a, 'gctx> {
1135    fn drop(&mut self) {
1136        self.set.downloading.set(false);
1137        let progress = self.progress.get_mut().take().unwrap();
1138        // Don't print a download summary if we're not using a progress bar,
1139        // we've already printed lots of `Downloading...` items.
1140        if !progress.is_enabled() {
1141            return;
1142        }
1143        // If we didn't download anything, no need for a summary.
1144        if self.downloads_finished == 0 {
1145            return;
1146        }
1147        // If an error happened, let's not clutter up the output.
1148        if !self.success {
1149            return;
1150        }
1151        // pick the correct plural of crate(s)
1152        let crate_string = if self.downloads_finished == 1 {
1153            "crate"
1154        } else {
1155            "crates"
1156        };
1157        let mut status = format!(
1158            "{} {} ({:.1}) in {}",
1159            self.downloads_finished,
1160            crate_string,
1161            HumanBytes(self.downloaded_bytes),
1162            util::elapsed(self.start.elapsed())
1163        );
1164        // print the size of largest crate if it was >1mb
1165        // however don't print if only a single crate was downloaded
1166        // because it is obvious that it will be the largest then
1167        let mib_1 = 1024 * 1024;
1168        if self.largest.0 > mib_1 && self.downloads_finished > 1 {
1169            status.push_str(&format!(
1170                " (largest was `{}` at {:.1})",
1171                self.largest.1,
1172                HumanBytes(self.largest.0),
1173            ));
1174        }
1175        // Clear progress before displaying final summary.
1176        drop(progress);
1177        drop(self.set.gctx.shell().status("Downloaded", status));
1178    }
1179}
1180
1181mod tls {
1182    use std::cell::Cell;
1183
1184    use super::Downloads;
1185
1186    thread_local!(static PTR: Cell<usize> = const { Cell::new(0) });
1187
1188    pub(crate) fn with<R>(f: impl FnOnce(Option<&Downloads<'_, '_>>) -> R) -> R {
1189        let ptr = PTR.with(|p| p.get());
1190        if ptr == 0 {
1191            f(None)
1192        } else {
1193            unsafe { f(Some(&*(ptr as *const Downloads<'_, '_>))) }
1194        }
1195    }
1196
1197    pub(crate) fn set<R>(dl: &Downloads<'_, '_>, f: impl FnOnce() -> R) -> R {
1198        struct Reset<'a, T: Copy>(&'a Cell<T>, T);
1199
1200        impl<'a, T: Copy> Drop for Reset<'a, T> {
1201            fn drop(&mut self) {
1202                self.0.set(self.1);
1203            }
1204        }
1205
1206        PTR.with(|p| {
1207            let _reset = Reset(p, p.get());
1208            p.set(dl as *const Downloads<'_, '_> as usize);
1209            f()
1210        })
1211    }
1212}