cargo/core/
package.rs

1use std::cell::{Cell, Ref, RefCell, RefMut};
2use std::cmp::Ordering;
3use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
4use std::fmt;
5use std::hash;
6use std::mem;
7use std::path::{Path, PathBuf};
8use std::rc::Rc;
9use std::time::{Duration, Instant};
10
11use anyhow::Context as _;
12use cargo_util_schemas::manifest::RustVersion;
13use curl::easy::Easy;
14use curl::multi::{EasyHandle, Multi};
15use lazycell::LazyCell;
16use semver::Version;
17use serde::Serialize;
18use tracing::debug;
19
20use crate::core::compiler::{CompileKind, RustcTargetData};
21use crate::core::dependency::DepKind;
22use crate::core::resolver::features::ForceAllTargets;
23use crate::core::resolver::{HasDevUnits, Resolve};
24use crate::core::{
25    CliUnstable, Dependency, Features, Manifest, PackageId, PackageIdSpec, SerializedDependency,
26    SourceId, Target,
27};
28use crate::core::{Summary, Workspace};
29use crate::sources::source::{MaybePackage, SourceMap};
30use crate::util::cache_lock::{CacheLock, CacheLockMode};
31use crate::util::errors::{CargoResult, HttpNotSuccessful};
32use crate::util::interning::InternedString;
33use crate::util::network::http::http_handle_and_timeout;
34use crate::util::network::http::HttpTimeout;
35use crate::util::network::retry::{Retry, RetryResult};
36use crate::util::network::sleep::SleepTracker;
37use crate::util::HumanBytes;
38use crate::util::{self, internal, GlobalContext, Progress, ProgressStyle};
39
40/// Information about a package that is available somewhere in the file system.
41///
42/// A package is a `Cargo.toml` file plus all the files that are part of it.
43#[derive(Clone)]
44pub struct Package {
45    inner: Rc<PackageInner>,
46}
47
48#[derive(Clone)]
49// TODO: is `manifest_path` a relic?
50struct PackageInner {
51    /// The package's manifest.
52    manifest: Manifest,
53    /// The root of the package.
54    manifest_path: PathBuf,
55}
56
57impl Ord for Package {
58    fn cmp(&self, other: &Package) -> Ordering {
59        self.package_id().cmp(&other.package_id())
60    }
61}
62
63impl PartialOrd for Package {
64    fn partial_cmp(&self, other: &Package) -> Option<Ordering> {
65        Some(self.cmp(other))
66    }
67}
68
69/// A Package in a form where `Serialize` can be derived.
70#[derive(Serialize)]
71pub struct SerializedPackage {
72    name: InternedString,
73    version: Version,
74    id: PackageIdSpec,
75    license: Option<String>,
76    license_file: Option<String>,
77    description: Option<String>,
78    source: SourceId,
79    dependencies: Vec<SerializedDependency>,
80    targets: Vec<Target>,
81    features: BTreeMap<InternedString, Vec<InternedString>>,
82    manifest_path: PathBuf,
83    metadata: Option<toml::Value>,
84    publish: Option<Vec<String>>,
85    authors: Vec<String>,
86    categories: Vec<String>,
87    keywords: Vec<String>,
88    readme: Option<String>,
89    repository: Option<String>,
90    homepage: Option<String>,
91    documentation: Option<String>,
92    edition: String,
93    links: Option<String>,
94    #[serde(skip_serializing_if = "Option::is_none")]
95    metabuild: Option<Vec<String>>,
96    default_run: Option<String>,
97    rust_version: Option<RustVersion>,
98}
99
100impl Package {
101    /// Creates a package from a manifest and its location.
102    pub fn new(manifest: Manifest, manifest_path: &Path) -> Package {
103        Package {
104            inner: Rc::new(PackageInner {
105                manifest,
106                manifest_path: manifest_path.to_path_buf(),
107            }),
108        }
109    }
110
111    /// Gets the manifest dependencies.
112    pub fn dependencies(&self) -> &[Dependency] {
113        self.manifest().dependencies()
114    }
115    /// Gets the manifest.
116    pub fn manifest(&self) -> &Manifest {
117        &self.inner.manifest
118    }
119    /// Gets the manifest.
120    pub fn manifest_mut(&mut self) -> &mut Manifest {
121        &mut Rc::make_mut(&mut self.inner).manifest
122    }
123    /// Gets the path to the manifest.
124    pub fn manifest_path(&self) -> &Path {
125        &self.inner.manifest_path
126    }
127    /// Gets the name of the package.
128    pub fn name(&self) -> InternedString {
129        self.package_id().name()
130    }
131    /// Gets the `PackageId` object for the package (fully defines a package).
132    pub fn package_id(&self) -> PackageId {
133        self.manifest().package_id()
134    }
135    /// Gets the root folder of the package.
136    pub fn root(&self) -> &Path {
137        self.manifest_path().parent().unwrap()
138    }
139    /// Gets the summary for the package.
140    pub fn summary(&self) -> &Summary {
141        self.manifest().summary()
142    }
143    /// Gets the targets specified in the manifest.
144    pub fn targets(&self) -> &[Target] {
145        self.manifest().targets()
146    }
147    /// Gets the library crate for this package, if it exists.
148    pub fn library(&self) -> Option<&Target> {
149        self.targets().iter().find(|t| t.is_lib())
150    }
151    /// Gets the current package version.
152    pub fn version(&self) -> &Version {
153        self.package_id().version()
154    }
155    /// Gets the package authors.
156    pub fn authors(&self) -> &Vec<String> {
157        &self.manifest().metadata().authors
158    }
159
160    /// Returns `None` if the package is set to publish.
161    /// Returns `Some(allowed_registries)` if publishing is limited to specified
162    /// registries or if package is set to not publish.
163    pub fn publish(&self) -> &Option<Vec<String>> {
164        self.manifest().publish()
165    }
166    /// Returns `true` if this package is a proc-macro.
167    pub fn proc_macro(&self) -> bool {
168        self.targets().iter().any(|target| target.proc_macro())
169    }
170    /// Gets the package's minimum Rust version.
171    pub fn rust_version(&self) -> Option<&RustVersion> {
172        self.manifest().rust_version()
173    }
174
175    /// Returns `true` if the package uses a custom build script for any target.
176    pub fn has_custom_build(&self) -> bool {
177        self.targets().iter().any(|t| t.is_custom_build())
178    }
179
180    pub fn map_source(self, to_replace: SourceId, replace_with: SourceId) -> Package {
181        Package {
182            inner: Rc::new(PackageInner {
183                manifest: self.manifest().clone().map_source(to_replace, replace_with),
184                manifest_path: self.manifest_path().to_owned(),
185            }),
186        }
187    }
188
189    pub fn serialized(
190        &self,
191        unstable_flags: &CliUnstable,
192        cargo_features: &Features,
193    ) -> SerializedPackage {
194        let summary = self.manifest().summary();
195        let package_id = summary.package_id();
196        let manmeta = self.manifest().metadata();
197        // Filter out metabuild targets. They are an internal implementation
198        // detail that is probably not relevant externally. There's also not a
199        // real path to show in `src_path`, and this avoids changing the format.
200        let targets: Vec<Target> = self
201            .manifest()
202            .targets()
203            .iter()
204            .filter(|t| t.src_path().is_path())
205            .cloned()
206            .collect();
207        // Convert Vec<FeatureValue> to Vec<InternedString>
208        let crate_features = summary
209            .features()
210            .iter()
211            .map(|(k, v)| (*k, v.iter().map(|fv| fv.to_string().into()).collect()))
212            .collect();
213
214        SerializedPackage {
215            name: package_id.name(),
216            version: package_id.version().clone(),
217            id: package_id.to_spec(),
218            license: manmeta.license.clone(),
219            license_file: manmeta.license_file.clone(),
220            description: manmeta.description.clone(),
221            source: summary.source_id(),
222            dependencies: summary
223                .dependencies()
224                .iter()
225                .map(|dep| dep.serialized(unstable_flags, cargo_features))
226                .collect(),
227            targets,
228            features: crate_features,
229            manifest_path: self.manifest_path().to_path_buf(),
230            metadata: self.manifest().custom_metadata().cloned(),
231            authors: manmeta.authors.clone(),
232            categories: manmeta.categories.clone(),
233            keywords: manmeta.keywords.clone(),
234            readme: manmeta.readme.clone(),
235            repository: manmeta.repository.clone(),
236            homepage: manmeta.homepage.clone(),
237            documentation: manmeta.documentation.clone(),
238            edition: self.manifest().edition().to_string(),
239            links: self.manifest().links().map(|s| s.to_owned()),
240            metabuild: self.manifest().metabuild().cloned(),
241            publish: self.publish().as_ref().cloned(),
242            default_run: self.manifest().default_run().map(|s| s.to_owned()),
243            rust_version: self.rust_version().cloned(),
244        }
245    }
246}
247
248impl fmt::Display for Package {
249    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
250        write!(f, "{}", self.summary().package_id())
251    }
252}
253
254impl fmt::Debug for Package {
255    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
256        f.debug_struct("Package")
257            .field("id", &self.summary().package_id())
258            .field("..", &"..")
259            .finish()
260    }
261}
262
263impl PartialEq for Package {
264    fn eq(&self, other: &Package) -> bool {
265        self.package_id() == other.package_id()
266    }
267}
268
269impl Eq for Package {}
270
271impl hash::Hash for Package {
272    fn hash<H: hash::Hasher>(&self, into: &mut H) {
273        self.package_id().hash(into)
274    }
275}
276
277/// A set of packages, with the intent to download.
278///
279/// This is primarily used to convert a set of `PackageId`s to `Package`s. It
280/// will download as needed, or used the cached download if available.
281pub struct PackageSet<'gctx> {
282    packages: HashMap<PackageId, LazyCell<Package>>,
283    sources: RefCell<SourceMap<'gctx>>,
284    gctx: &'gctx GlobalContext,
285    multi: Multi,
286    /// Used to prevent reusing the `PackageSet` to download twice.
287    downloading: Cell<bool>,
288    /// Whether or not to use curl HTTP/2 multiplexing.
289    multiplexing: bool,
290}
291
292/// Helper for downloading crates.
293pub struct Downloads<'a, 'gctx> {
294    set: &'a PackageSet<'gctx>,
295    /// When a download is started, it is added to this map. The key is a
296    /// "token" (see `Download::token`). It is removed once the download is
297    /// finished.
298    pending: HashMap<usize, (Download<'gctx>, EasyHandle)>,
299    /// Set of packages currently being downloaded. This should stay in sync
300    /// with `pending`.
301    pending_ids: HashSet<PackageId>,
302    /// Downloads that have failed and are waiting to retry again later.
303    sleeping: SleepTracker<(Download<'gctx>, Easy)>,
304    /// The final result of each download. A pair `(token, result)`. This is a
305    /// temporary holding area, needed because curl can report multiple
306    /// downloads at once, but the main loop (`wait`) is written to only
307    /// handle one at a time.
308    results: Vec<(usize, Result<(), curl::Error>)>,
309    /// The next ID to use for creating a token (see `Download::token`).
310    next: usize,
311    /// Progress bar.
312    progress: RefCell<Option<Progress<'gctx>>>,
313    /// Number of downloads that have successfully finished.
314    downloads_finished: usize,
315    /// Total bytes for all successfully downloaded packages.
316    downloaded_bytes: u64,
317    /// Size (in bytes) and package name of the largest downloaded package.
318    largest: (u64, InternedString),
319    /// Time when downloading started.
320    start: Instant,
321    /// Indicates *all* downloads were successful.
322    success: bool,
323
324    /// Timeout management, both of timeout thresholds as well as whether or not
325    /// our connection has timed out (and accompanying message if it has).
326    ///
327    /// Note that timeout management is done manually here instead of in libcurl
328    /// because we want to apply timeouts to an entire batch of operations, not
329    /// any one particular single operation.
330    timeout: HttpTimeout,
331    /// Last time bytes were received.
332    updated_at: Cell<Instant>,
333    /// This is a slow-speed check. It is reset to `now + timeout_duration`
334    /// every time at least `threshold` bytes are received. If the current
335    /// time ever exceeds `next_speed_check`, then give up and report a
336    /// timeout error.
337    next_speed_check: Cell<Instant>,
338    /// This is the slow-speed threshold byte count. It starts at the
339    /// configured threshold value (default 10), and is decremented by the
340    /// number of bytes received in each chunk. If it is <= zero, the
341    /// threshold has been met and data is being received fast enough not to
342    /// trigger a timeout; reset `next_speed_check` and set this back to the
343    /// configured threshold.
344    next_speed_check_bytes_threshold: Cell<u64>,
345    /// Global filesystem lock to ensure only one Cargo is downloading at a
346    /// time.
347    _lock: CacheLock<'gctx>,
348}
349
350struct Download<'gctx> {
351    /// The token for this download, used as the key of the `Downloads::pending` map
352    /// and stored in `EasyHandle` as well.
353    token: usize,
354
355    /// The package that we're downloading.
356    id: PackageId,
357
358    /// Actual downloaded data, updated throughout the lifetime of this download.
359    data: RefCell<Vec<u8>>,
360
361    /// HTTP headers for debugging.
362    headers: RefCell<Vec<String>>,
363
364    /// The URL that we're downloading from, cached here for error messages and
365    /// reenqueuing.
366    url: String,
367
368    /// A descriptive string to print when we've finished downloading this crate.
369    descriptor: String,
370
371    /// Statistics updated from the progress callback in libcurl.
372    total: Cell<u64>,
373    current: Cell<u64>,
374
375    /// The moment we started this transfer at.
376    start: Instant,
377    timed_out: Cell<Option<String>>,
378
379    /// Logic used to track retrying this download if it's a spurious failure.
380    retry: Retry<'gctx>,
381}
382
383impl<'gctx> PackageSet<'gctx> {
384    pub fn new(
385        package_ids: &[PackageId],
386        sources: SourceMap<'gctx>,
387        gctx: &'gctx GlobalContext,
388    ) -> CargoResult<PackageSet<'gctx>> {
389        // We've enabled the `http2` feature of `curl` in Cargo, so treat
390        // failures here as fatal as it would indicate a build-time problem.
391        let mut multi = Multi::new();
392        let multiplexing = gctx.http_config()?.multiplexing.unwrap_or(true);
393        multi
394            .pipelining(false, multiplexing)
395            .context("failed to enable multiplexing/pipelining in curl")?;
396
397        // let's not flood crates.io with connections
398        multi.set_max_host_connections(2)?;
399
400        Ok(PackageSet {
401            packages: package_ids
402                .iter()
403                .map(|&id| (id, LazyCell::new()))
404                .collect(),
405            sources: RefCell::new(sources),
406            gctx,
407            multi,
408            downloading: Cell::new(false),
409            multiplexing,
410        })
411    }
412
413    pub fn package_ids(&self) -> impl Iterator<Item = PackageId> + '_ {
414        self.packages.keys().cloned()
415    }
416
417    pub fn packages(&self) -> impl Iterator<Item = &Package> {
418        self.packages.values().filter_map(|p| p.borrow())
419    }
420
421    pub fn enable_download<'a>(&'a self) -> CargoResult<Downloads<'a, 'gctx>> {
422        assert!(!self.downloading.replace(true));
423        let timeout = HttpTimeout::new(self.gctx)?;
424        Ok(Downloads {
425            start: Instant::now(),
426            set: self,
427            next: 0,
428            pending: HashMap::new(),
429            pending_ids: HashSet::new(),
430            sleeping: SleepTracker::new(),
431            results: Vec::new(),
432            progress: RefCell::new(Some(Progress::with_style(
433                "Downloading",
434                ProgressStyle::Ratio,
435                self.gctx,
436            ))),
437            downloads_finished: 0,
438            downloaded_bytes: 0,
439            largest: (0, "".into()),
440            success: false,
441            updated_at: Cell::new(Instant::now()),
442            timeout,
443            next_speed_check: Cell::new(Instant::now()),
444            next_speed_check_bytes_threshold: Cell::new(0),
445            _lock: self
446                .gctx
447                .acquire_package_cache_lock(CacheLockMode::DownloadExclusive)?,
448        })
449    }
450
451    pub fn get_one(&self, id: PackageId) -> CargoResult<&Package> {
452        if let Some(pkg) = self.packages.get(&id).and_then(|slot| slot.borrow()) {
453            return Ok(pkg);
454        }
455        Ok(self.get_many(Some(id))?.remove(0))
456    }
457
458    pub fn get_many(&self, ids: impl IntoIterator<Item = PackageId>) -> CargoResult<Vec<&Package>> {
459        let mut pkgs = Vec::new();
460        let _lock = self
461            .gctx
462            .acquire_package_cache_lock(CacheLockMode::DownloadExclusive)?;
463        let mut downloads = self.enable_download()?;
464        for id in ids {
465            pkgs.extend(downloads.start(id)?);
466        }
467        while downloads.remaining() > 0 {
468            pkgs.push(downloads.wait()?);
469        }
470        downloads.success = true;
471        drop(downloads);
472
473        let mut deferred = self.gctx.deferred_global_last_use()?;
474        deferred.save_no_error(self.gctx);
475        Ok(pkgs)
476    }
477
478    /// Downloads any packages accessible from the give root ids.
479    #[tracing::instrument(skip_all)]
480    pub fn download_accessible(
481        &self,
482        resolve: &Resolve,
483        root_ids: &[PackageId],
484        has_dev_units: HasDevUnits,
485        requested_kinds: &[CompileKind],
486        target_data: &RustcTargetData<'gctx>,
487        force_all_targets: ForceAllTargets,
488    ) -> CargoResult<()> {
489        fn collect_used_deps(
490            used: &mut BTreeSet<(PackageId, CompileKind)>,
491            resolve: &Resolve,
492            pkg_id: PackageId,
493            has_dev_units: HasDevUnits,
494            requested_kind: CompileKind,
495            target_data: &RustcTargetData<'_>,
496            force_all_targets: ForceAllTargets,
497        ) -> CargoResult<()> {
498            if !used.insert((pkg_id, requested_kind)) {
499                return Ok(());
500            }
501            let requested_kinds = &[requested_kind];
502            let filtered_deps = PackageSet::filter_deps(
503                pkg_id,
504                resolve,
505                has_dev_units,
506                requested_kinds,
507                target_data,
508                force_all_targets,
509            );
510            for (pkg_id, deps) in filtered_deps {
511                collect_used_deps(
512                    used,
513                    resolve,
514                    pkg_id,
515                    has_dev_units,
516                    requested_kind,
517                    target_data,
518                    force_all_targets,
519                )?;
520                let artifact_kinds = deps.iter().filter_map(|dep| {
521                    Some(
522                        dep.artifact()?
523                            .target()?
524                            .to_resolved_compile_kind(*requested_kinds.iter().next().unwrap()),
525                    )
526                });
527                for artifact_kind in artifact_kinds {
528                    collect_used_deps(
529                        used,
530                        resolve,
531                        pkg_id,
532                        has_dev_units,
533                        artifact_kind,
534                        target_data,
535                        force_all_targets,
536                    )?;
537                }
538            }
539            Ok(())
540        }
541
542        // This is sorted by PackageId to get consistent behavior and error
543        // messages for Cargo's testsuite. Perhaps there is a better ordering
544        // that optimizes download time?
545        let mut to_download = BTreeSet::new();
546
547        for id in root_ids {
548            for requested_kind in requested_kinds {
549                collect_used_deps(
550                    &mut to_download,
551                    resolve,
552                    *id,
553                    has_dev_units,
554                    *requested_kind,
555                    target_data,
556                    force_all_targets,
557                )?;
558            }
559        }
560        let to_download = to_download
561            .into_iter()
562            .map(|(p, _)| p)
563            .collect::<BTreeSet<_>>();
564        self.get_many(to_download.into_iter())?;
565        Ok(())
566    }
567
568    /// Check if there are any dependency packages that violate artifact constraints
569    /// to instantly abort, or that do not have any libs which results in warnings.
570    pub(crate) fn warn_no_lib_packages_and_artifact_libs_overlapping_deps(
571        &self,
572        ws: &Workspace<'gctx>,
573        resolve: &Resolve,
574        root_ids: &[PackageId],
575        has_dev_units: HasDevUnits,
576        requested_kinds: &[CompileKind],
577        target_data: &RustcTargetData<'_>,
578        force_all_targets: ForceAllTargets,
579    ) -> CargoResult<()> {
580        let no_lib_pkgs: BTreeMap<PackageId, Vec<(&Package, &HashSet<Dependency>)>> = root_ids
581            .iter()
582            .map(|&root_id| {
583                let dep_pkgs_to_deps: Vec<_> = PackageSet::filter_deps(
584                    root_id,
585                    resolve,
586                    has_dev_units,
587                    requested_kinds,
588                    target_data,
589                    force_all_targets,
590                )
591                .collect();
592
593                let dep_pkgs_and_deps = dep_pkgs_to_deps
594                    .into_iter()
595                    .filter(|(_id, deps)| deps.iter().any(|dep| dep.maybe_lib()))
596                    .filter_map(|(dep_package_id, deps)| {
597                        self.get_one(dep_package_id).ok().and_then(|dep_pkg| {
598                            (!dep_pkg.targets().iter().any(|t| t.is_lib())).then(|| (dep_pkg, deps))
599                        })
600                    })
601                    .collect();
602                (root_id, dep_pkgs_and_deps)
603            })
604            .collect();
605
606        for (pkg_id, dep_pkgs) in no_lib_pkgs {
607            for (_dep_pkg_without_lib_target, deps) in dep_pkgs {
608                for dep in deps.iter().filter(|dep| {
609                    dep.artifact()
610                        .map(|artifact| artifact.is_lib())
611                        .unwrap_or(true)
612                }) {
613                    ws.gctx().shell().warn(&format!(
614                        "{} ignoring invalid dependency `{}` which is missing a lib target",
615                        pkg_id,
616                        dep.name_in_toml(),
617                    ))?;
618                }
619            }
620        }
621        Ok(())
622    }
623
624    fn filter_deps<'a>(
625        pkg_id: PackageId,
626        resolve: &'a Resolve,
627        has_dev_units: HasDevUnits,
628        requested_kinds: &'a [CompileKind],
629        target_data: &'a RustcTargetData<'_>,
630        force_all_targets: ForceAllTargets,
631    ) -> impl Iterator<Item = (PackageId, &'a HashSet<Dependency>)> + 'a {
632        resolve
633            .deps(pkg_id)
634            .filter(move |&(_id, deps)| {
635                deps.iter().any(|dep| {
636                    if dep.kind() == DepKind::Development && has_dev_units == HasDevUnits::No {
637                        return false;
638                    }
639                    if force_all_targets == ForceAllTargets::No {
640                        let activated = requested_kinds
641                            .iter()
642                            .chain(Some(&CompileKind::Host))
643                            .any(|kind| target_data.dep_platform_activated(dep, *kind));
644                        if !activated {
645                            return false;
646                        }
647                    }
648                    true
649                })
650            })
651            .into_iter()
652    }
653
654    pub fn sources(&self) -> Ref<'_, SourceMap<'gctx>> {
655        self.sources.borrow()
656    }
657
658    pub fn sources_mut(&self) -> RefMut<'_, SourceMap<'gctx>> {
659        self.sources.borrow_mut()
660    }
661
662    /// Merge the given set into self.
663    pub fn add_set(&mut self, set: PackageSet<'gctx>) {
664        assert!(!self.downloading.get());
665        assert!(!set.downloading.get());
666        for (pkg_id, p_cell) in set.packages {
667            self.packages.entry(pkg_id).or_insert(p_cell);
668        }
669        let mut sources = self.sources.borrow_mut();
670        let other_sources = set.sources.into_inner();
671        sources.add_source_map(other_sources);
672    }
673}
674
675impl<'a, 'gctx> Downloads<'a, 'gctx> {
676    /// Starts to download the package for the `id` specified.
677    ///
678    /// Returns `None` if the package is queued up for download and will
679    /// eventually be returned from `wait_for_download`. Returns `Some(pkg)` if
680    /// the package is ready and doesn't need to be downloaded.
681    #[tracing::instrument(skip_all)]
682    pub fn start(&mut self, id: PackageId) -> CargoResult<Option<&'a Package>> {
683        self.start_inner(id)
684            .with_context(|| format!("failed to download `{}`", id))
685    }
686
687    fn start_inner(&mut self, id: PackageId) -> CargoResult<Option<&'a Package>> {
688        // First up see if we've already cached this package, in which case
689        // there's nothing to do.
690        let slot = self
691            .set
692            .packages
693            .get(&id)
694            .ok_or_else(|| internal(format!("couldn't find `{}` in package set", id)))?;
695        if let Some(pkg) = slot.borrow() {
696            return Ok(Some(pkg));
697        }
698
699        // Ask the original source for this `PackageId` for the corresponding
700        // package. That may immediately come back and tell us that the package
701        // is ready, or it could tell us that it needs to be downloaded.
702        let mut sources = self.set.sources.borrow_mut();
703        let source = sources
704            .get_mut(id.source_id())
705            .ok_or_else(|| internal(format!("couldn't find source for `{}`", id)))?;
706        let pkg = source
707            .download(id)
708            .context("unable to get packages from source")?;
709        let (url, descriptor, authorization) = match pkg {
710            MaybePackage::Ready(pkg) => {
711                debug!("{} doesn't need a download", id);
712                assert!(slot.fill(pkg).is_ok());
713                return Ok(Some(slot.borrow().unwrap()));
714            }
715            MaybePackage::Download {
716                url,
717                descriptor,
718                authorization,
719            } => (url, descriptor, authorization),
720        };
721
722        // Ok we're going to download this crate, so let's set up all our
723        // internal state and hand off an `Easy` handle to our libcurl `Multi`
724        // handle. This won't actually start the transfer, but later it'll
725        // happen during `wait_for_download`
726        let token = self.next;
727        self.next += 1;
728        debug!(target: "network", "downloading {} as {}", id, token);
729        assert!(self.pending_ids.insert(id));
730
731        let (mut handle, _timeout) = http_handle_and_timeout(self.set.gctx)?;
732        handle.get(true)?;
733        handle.url(&url)?;
734        handle.follow_location(true)?; // follow redirects
735
736        // Add authorization header.
737        if let Some(authorization) = authorization {
738            let mut headers = curl::easy::List::new();
739            headers.append(&format!("Authorization: {}", authorization))?;
740            handle.http_headers(headers)?;
741        }
742
743        // Enable HTTP/2 if possible.
744        crate::try_old_curl_http2_pipewait!(self.set.multiplexing, handle);
745
746        handle.write_function(move |buf| {
747            debug!(target: "network", "{} - {} bytes of data", token, buf.len());
748            tls::with(|downloads| {
749                if let Some(downloads) = downloads {
750                    downloads.pending[&token]
751                        .0
752                        .data
753                        .borrow_mut()
754                        .extend_from_slice(buf);
755                }
756            });
757            Ok(buf.len())
758        })?;
759        handle.header_function(move |data| {
760            tls::with(|downloads| {
761                if let Some(downloads) = downloads {
762                    // Headers contain trailing \r\n, trim them to make it easier
763                    // to work with.
764                    let h = String::from_utf8_lossy(data).trim().to_string();
765                    downloads.pending[&token].0.headers.borrow_mut().push(h);
766                }
767            });
768            true
769        })?;
770
771        handle.progress(true)?;
772        handle.progress_function(move |dl_total, dl_cur, _, _| {
773            tls::with(|downloads| match downloads {
774                Some(d) => d.progress(token, dl_total as u64, dl_cur as u64),
775                None => false,
776            })
777        })?;
778
779        // If the progress bar isn't enabled then it may be awhile before the
780        // first crate finishes downloading so we inform immediately that we're
781        // downloading crates here.
782        if self.downloads_finished == 0
783            && self.pending.is_empty()
784            && !self.progress.borrow().as_ref().unwrap().is_enabled()
785        {
786            self.set.gctx.shell().status("Downloading", "crates ...")?;
787        }
788
789        let dl = Download {
790            token,
791            data: RefCell::new(Vec::new()),
792            headers: RefCell::new(Vec::new()),
793            id,
794            url,
795            descriptor,
796            total: Cell::new(0),
797            current: Cell::new(0),
798            start: Instant::now(),
799            timed_out: Cell::new(None),
800            retry: Retry::new(self.set.gctx)?,
801        };
802        self.enqueue(dl, handle)?;
803        self.tick(WhyTick::DownloadStarted)?;
804
805        Ok(None)
806    }
807
808    /// Returns the number of crates that are still downloading.
809    pub fn remaining(&self) -> usize {
810        self.pending.len() + self.sleeping.len()
811    }
812
813    /// Blocks the current thread waiting for a package to finish downloading.
814    ///
815    /// This method will wait for a previously enqueued package to finish
816    /// downloading and return a reference to it after it's done downloading.
817    ///
818    /// # Panics
819    ///
820    /// This function will panic if there are no remaining downloads.
821    #[tracing::instrument(skip_all)]
822    pub fn wait(&mut self) -> CargoResult<&'a Package> {
823        let (dl, data) = loop {
824            assert_eq!(self.pending.len(), self.pending_ids.len());
825            let (token, result) = self.wait_for_curl()?;
826            debug!(target: "network", "{} finished with {:?}", token, result);
827
828            let (mut dl, handle) = self
829                .pending
830                .remove(&token)
831                .expect("got a token for a non-in-progress transfer");
832            let data = mem::take(&mut *dl.data.borrow_mut());
833            let headers = mem::take(&mut *dl.headers.borrow_mut());
834            let mut handle = self.set.multi.remove(handle)?;
835            self.pending_ids.remove(&dl.id);
836
837            // Check if this was a spurious error. If it was a spurious error
838            // then we want to re-enqueue our request for another attempt and
839            // then we wait for another request to finish.
840            let ret = {
841                let timed_out = &dl.timed_out;
842                let url = &dl.url;
843                dl.retry.r#try(|| {
844                    if let Err(e) = result {
845                        // If this error is "aborted by callback" then that's
846                        // probably because our progress callback aborted due to
847                        // a timeout. We'll find out by looking at the
848                        // `timed_out` field, looking for a descriptive message.
849                        // If one is found we switch the error code (to ensure
850                        // it's flagged as spurious) and then attach our extra
851                        // information to the error.
852                        if !e.is_aborted_by_callback() {
853                            return Err(e.into());
854                        }
855
856                        return Err(match timed_out.replace(None) {
857                            Some(msg) => {
858                                let code = curl_sys::CURLE_OPERATION_TIMEDOUT;
859                                let mut err = curl::Error::new(code);
860                                err.set_extra(msg);
861                                err
862                            }
863                            None => e,
864                        }
865                        .into());
866                    }
867
868                    let code = handle.response_code()?;
869                    if code != 200 && code != 0 {
870                        return Err(HttpNotSuccessful::new_from_handle(
871                            &mut handle,
872                            &url,
873                            data,
874                            headers,
875                        )
876                        .into());
877                    }
878                    Ok(data)
879                })
880            };
881            match ret {
882                RetryResult::Success(data) => break (dl, data),
883                RetryResult::Err(e) => {
884                    return Err(e.context(format!("failed to download from `{}`", dl.url)))
885                }
886                RetryResult::Retry(sleep) => {
887                    debug!(target: "network", "download retry {} for {sleep}ms", dl.url);
888                    self.sleeping.push(sleep, (dl, handle));
889                }
890            }
891        };
892
893        // If the progress bar isn't enabled then we still want to provide some
894        // semblance of progress of how we're downloading crates, and if the
895        // progress bar is enabled this provides a good log of what's happening.
896        self.progress.borrow_mut().as_mut().unwrap().clear();
897        self.set.gctx.shell().status("Downloaded", &dl.descriptor)?;
898
899        self.downloads_finished += 1;
900        self.downloaded_bytes += dl.total.get();
901        if dl.total.get() > self.largest.0 {
902            self.largest = (dl.total.get(), dl.id.name());
903        }
904
905        // We're about to synchronously extract the crate below. While we're
906        // doing that our download progress won't actually be updated, nor do we
907        // have a great view into the progress of the extraction. Let's prepare
908        // the user for this CPU-heavy step if it looks like it'll take some
909        // time to do so.
910        let kib_400 = 1024 * 400;
911        if dl.total.get() < kib_400 {
912            self.tick(WhyTick::DownloadFinished)?;
913        } else {
914            self.tick(WhyTick::Extracting(&dl.id.name()))?;
915        }
916
917        // Inform the original source that the download is finished which
918        // should allow us to actually get the package and fill it in now.
919        let mut sources = self.set.sources.borrow_mut();
920        let source = sources
921            .get_mut(dl.id.source_id())
922            .ok_or_else(|| internal(format!("couldn't find source for `{}`", dl.id)))?;
923        let start = Instant::now();
924        let pkg = source.finish_download(dl.id, data)?;
925
926        // Assume that no time has passed while we were calling
927        // `finish_download`, update all speed checks and timeout limits of all
928        // active downloads to make sure they don't fire because of a slowly
929        // extracted tarball.
930        let finish_dur = start.elapsed();
931        self.updated_at.set(self.updated_at.get() + finish_dur);
932        self.next_speed_check
933            .set(self.next_speed_check.get() + finish_dur);
934
935        let slot = &self.set.packages[&dl.id];
936        assert!(slot.fill(pkg).is_ok());
937        Ok(slot.borrow().unwrap())
938    }
939
940    fn enqueue(&mut self, dl: Download<'gctx>, handle: Easy) -> CargoResult<()> {
941        let mut handle = self.set.multi.add(handle)?;
942        let now = Instant::now();
943        handle.set_token(dl.token)?;
944        self.updated_at.set(now);
945        self.next_speed_check.set(now + self.timeout.dur);
946        self.next_speed_check_bytes_threshold
947            .set(u64::from(self.timeout.low_speed_limit));
948        dl.timed_out.set(None);
949        dl.current.set(0);
950        dl.total.set(0);
951        self.pending.insert(dl.token, (dl, handle));
952        Ok(())
953    }
954
955    /// Block, waiting for curl. Returns a token and a `Result` for that token
956    /// (`Ok` means the download successfully finished).
957    fn wait_for_curl(&mut self) -> CargoResult<(usize, Result<(), curl::Error>)> {
958        // This is the main workhorse loop. We use libcurl's portable `wait`
959        // method to actually perform blocking. This isn't necessarily too
960        // efficient in terms of fd management, but we should only be juggling
961        // a few anyway.
962        //
963        // Here we start off by asking the `multi` handle to do some work via
964        // the `perform` method. This will actually do I/O work (non-blocking)
965        // and attempt to make progress. Afterwards we ask about the `messages`
966        // contained in the handle which will inform us if anything has finished
967        // transferring.
968        //
969        // If we've got a finished transfer after all that work we break out
970        // and process the finished transfer at the end. Otherwise we need to
971        // actually block waiting for I/O to happen, which we achieve with the
972        // `wait` method on `multi`.
973        loop {
974            self.add_sleepers()?;
975            let n = tls::set(self, || {
976                self.set
977                    .multi
978                    .perform()
979                    .context("failed to perform http requests")
980            })?;
981            debug!(target: "network", "handles remaining: {}", n);
982            let results = &mut self.results;
983            let pending = &self.pending;
984            self.set.multi.messages(|msg| {
985                let token = msg.token().expect("failed to read token");
986                let handle = &pending[&token].1;
987                if let Some(result) = msg.result_for(handle) {
988                    results.push((token, result));
989                } else {
990                    debug!(target: "network", "message without a result (?)");
991                }
992            });
993
994            if let Some(pair) = results.pop() {
995                break Ok(pair);
996            }
997            assert_ne!(self.remaining(), 0);
998            if self.pending.is_empty() {
999                let delay = self.sleeping.time_to_next().unwrap();
1000                debug!(target: "network", "sleeping main thread for {delay:?}");
1001                std::thread::sleep(delay);
1002            } else {
1003                let min_timeout = Duration::new(1, 0);
1004                let timeout = self.set.multi.get_timeout()?.unwrap_or(min_timeout);
1005                let timeout = timeout.min(min_timeout);
1006                self.set
1007                    .multi
1008                    .wait(&mut [], timeout)
1009                    .context("failed to wait on curl `Multi`")?;
1010            }
1011        }
1012    }
1013
1014    fn add_sleepers(&mut self) -> CargoResult<()> {
1015        for (dl, handle) in self.sleeping.to_retry() {
1016            self.pending_ids.insert(dl.id);
1017            self.enqueue(dl, handle)?;
1018        }
1019        Ok(())
1020    }
1021
1022    fn progress(&self, token: usize, total: u64, cur: u64) -> bool {
1023        let dl = &self.pending[&token].0;
1024        dl.total.set(total);
1025        let now = Instant::now();
1026        if cur > dl.current.get() {
1027            let delta = cur - dl.current.get();
1028            let threshold = self.next_speed_check_bytes_threshold.get();
1029
1030            dl.current.set(cur);
1031            self.updated_at.set(now);
1032
1033            if delta >= threshold {
1034                self.next_speed_check.set(now + self.timeout.dur);
1035                self.next_speed_check_bytes_threshold
1036                    .set(u64::from(self.timeout.low_speed_limit));
1037            } else {
1038                self.next_speed_check_bytes_threshold.set(threshold - delta);
1039            }
1040        }
1041        if self.tick(WhyTick::DownloadUpdate).is_err() {
1042            return false;
1043        }
1044
1045        // If we've spent too long not actually receiving any data we time out.
1046        if now > self.updated_at.get() + self.timeout.dur {
1047            self.updated_at.set(now);
1048            let msg = format!(
1049                "failed to download any data for `{}` within {}s",
1050                dl.id,
1051                self.timeout.dur.as_secs()
1052            );
1053            dl.timed_out.set(Some(msg));
1054            return false;
1055        }
1056
1057        // If we reached the point in time that we need to check our speed
1058        // limit, see if we've transferred enough data during this threshold. If
1059        // it fails this check then we fail because the download is going too
1060        // slowly.
1061        if now >= self.next_speed_check.get() {
1062            self.next_speed_check.set(now + self.timeout.dur);
1063            assert!(self.next_speed_check_bytes_threshold.get() > 0);
1064            let msg = format!(
1065                "download of `{}` failed to transfer more \
1066                 than {} bytes in {}s",
1067                dl.id,
1068                self.timeout.low_speed_limit,
1069                self.timeout.dur.as_secs()
1070            );
1071            dl.timed_out.set(Some(msg));
1072            return false;
1073        }
1074
1075        true
1076    }
1077
1078    fn tick(&self, why: WhyTick<'_>) -> CargoResult<()> {
1079        let mut progress = self.progress.borrow_mut();
1080        let progress = progress.as_mut().unwrap();
1081
1082        if let WhyTick::DownloadUpdate = why {
1083            if !progress.update_allowed() {
1084                return Ok(());
1085            }
1086        }
1087        let pending = self.remaining();
1088        let mut msg = if pending == 1 {
1089            format!("{} crate", pending)
1090        } else {
1091            format!("{} crates", pending)
1092        };
1093        match why {
1094            WhyTick::Extracting(krate) => {
1095                msg.push_str(&format!(", extracting {} ...", krate));
1096            }
1097            _ => {
1098                let mut dur = Duration::new(0, 0);
1099                let mut remaining = 0;
1100                for (dl, _) in self.pending.values() {
1101                    dur += dl.start.elapsed();
1102                    // If the total/current look weird just throw out the data
1103                    // point, sounds like curl has more to learn before we have
1104                    // the true information.
1105                    if dl.total.get() >= dl.current.get() {
1106                        remaining += dl.total.get() - dl.current.get();
1107                    }
1108                }
1109                if remaining > 0 && dur > Duration::from_millis(500) {
1110                    msg.push_str(&format!(", remaining bytes: {:.1}", HumanBytes(remaining)));
1111                }
1112            }
1113        }
1114        progress.print_now(&msg)
1115    }
1116}
1117
1118#[derive(Copy, Clone)]
1119enum WhyTick<'a> {
1120    DownloadStarted,
1121    DownloadUpdate,
1122    DownloadFinished,
1123    Extracting(&'a str),
1124}
1125
1126impl<'a, 'gctx> Drop for Downloads<'a, 'gctx> {
1127    fn drop(&mut self) {
1128        self.set.downloading.set(false);
1129        let progress = self.progress.get_mut().take().unwrap();
1130        // Don't print a download summary if we're not using a progress bar,
1131        // we've already printed lots of `Downloading...` items.
1132        if !progress.is_enabled() {
1133            return;
1134        }
1135        // If we didn't download anything, no need for a summary.
1136        if self.downloads_finished == 0 {
1137            return;
1138        }
1139        // If an error happened, let's not clutter up the output.
1140        if !self.success {
1141            return;
1142        }
1143        // pick the correct plural of crate(s)
1144        let crate_string = if self.downloads_finished == 1 {
1145            "crate"
1146        } else {
1147            "crates"
1148        };
1149        let mut status = format!(
1150            "{} {} ({:.1}) in {}",
1151            self.downloads_finished,
1152            crate_string,
1153            HumanBytes(self.downloaded_bytes),
1154            util::elapsed(self.start.elapsed())
1155        );
1156        // print the size of largest crate if it was >1mb
1157        // however don't print if only a single crate was downloaded
1158        // because it is obvious that it will be the largest then
1159        let mib_1 = 1024 * 1024;
1160        if self.largest.0 > mib_1 && self.downloads_finished > 1 {
1161            status.push_str(&format!(
1162                " (largest was `{}` at {:.1})",
1163                self.largest.1,
1164                HumanBytes(self.largest.0),
1165            ));
1166        }
1167        // Clear progress before displaying final summary.
1168        drop(progress);
1169        drop(self.set.gctx.shell().status("Downloaded", status));
1170    }
1171}
1172
1173mod tls {
1174    use std::cell::Cell;
1175
1176    use super::Downloads;
1177
1178    thread_local!(static PTR: Cell<usize> = const { Cell::new(0) });
1179
1180    pub(crate) fn with<R>(f: impl FnOnce(Option<&Downloads<'_, '_>>) -> R) -> R {
1181        let ptr = PTR.with(|p| p.get());
1182        if ptr == 0 {
1183            f(None)
1184        } else {
1185            unsafe { f(Some(&*(ptr as *const Downloads<'_, '_>))) }
1186        }
1187    }
1188
1189    pub(crate) fn set<R>(dl: &Downloads<'_, '_>, f: impl FnOnce() -> R) -> R {
1190        struct Reset<'a, T: Copy>(&'a Cell<T>, T);
1191
1192        impl<'a, T: Copy> Drop for Reset<'a, T> {
1193            fn drop(&mut self) {
1194                self.0.set(self.1);
1195            }
1196        }
1197
1198        PTR.with(|p| {
1199            let _reset = Reset(p, p.get());
1200            p.set(dl as *const Downloads<'_, '_> as usize);
1201            f()
1202        })
1203    }
1204}