1use std::cell::{Cell, Ref, RefCell, RefMut};
2use std::cmp::Ordering;
3use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
4use std::fmt;
5use std::hash;
6use std::mem;
7use std::path::{Path, PathBuf};
8use std::rc::Rc;
9use std::time::{Duration, Instant};
10
11use anyhow::Context as _;
12use cargo_util_schemas::manifest::{Hints, RustVersion};
13use curl::easy::Easy;
14use curl::multi::{EasyHandle, Multi};
15use lazycell::LazyCell;
16use semver::Version;
17use serde::Serialize;
18use tracing::debug;
19
20use crate::core::compiler::{CompileKind, RustcTargetData};
21use crate::core::dependency::DepKind;
22use crate::core::resolver::features::ForceAllTargets;
23use crate::core::resolver::{HasDevUnits, Resolve};
24use crate::core::{
25 CliUnstable, Dependency, Features, Manifest, PackageId, PackageIdSpec, SerializedDependency,
26 SourceId, Target,
27};
28use crate::core::{Summary, Workspace};
29use crate::sources::source::{MaybePackage, SourceMap};
30use crate::util::HumanBytes;
31use crate::util::cache_lock::{CacheLock, CacheLockMode};
32use crate::util::errors::{CargoResult, HttpNotSuccessful};
33use crate::util::interning::InternedString;
34use crate::util::network::http::HttpTimeout;
35use crate::util::network::http::http_handle_and_timeout;
36use crate::util::network::retry::{Retry, RetryResult};
37use crate::util::network::sleep::SleepTracker;
38use crate::util::{self, GlobalContext, Progress, ProgressStyle, internal};
39
40#[derive(Clone)]
44pub struct Package {
45 inner: Rc<PackageInner>,
46}
47
48#[derive(Clone)]
49struct PackageInner {
51 manifest: Manifest,
53 manifest_path: PathBuf,
55}
56
57impl Ord for Package {
58 fn cmp(&self, other: &Package) -> Ordering {
59 self.package_id().cmp(&other.package_id())
60 }
61}
62
63impl PartialOrd for Package {
64 fn partial_cmp(&self, other: &Package) -> Option<Ordering> {
65 Some(self.cmp(other))
66 }
67}
68
69#[derive(Serialize)]
71pub struct SerializedPackage {
72 name: InternedString,
73 version: Version,
74 id: PackageIdSpec,
75 license: Option<String>,
76 license_file: Option<String>,
77 description: Option<String>,
78 source: SourceId,
79 dependencies: Vec<SerializedDependency>,
80 targets: Vec<Target>,
81 features: BTreeMap<InternedString, Vec<InternedString>>,
82 manifest_path: PathBuf,
83 metadata: Option<toml::Value>,
84 publish: Option<Vec<String>>,
85 authors: Vec<String>,
86 categories: Vec<String>,
87 keywords: Vec<String>,
88 readme: Option<String>,
89 repository: Option<String>,
90 homepage: Option<String>,
91 documentation: Option<String>,
92 edition: String,
93 links: Option<String>,
94 #[serde(skip_serializing_if = "Option::is_none")]
95 metabuild: Option<Vec<String>>,
96 default_run: Option<String>,
97 rust_version: Option<RustVersion>,
98 #[serde(skip_serializing_if = "Option::is_none")]
99 hints: Option<Hints>,
100}
101
102impl Package {
103 pub fn new(manifest: Manifest, manifest_path: &Path) -> Package {
105 Package {
106 inner: Rc::new(PackageInner {
107 manifest,
108 manifest_path: manifest_path.to_path_buf(),
109 }),
110 }
111 }
112
113 pub fn dependencies(&self) -> &[Dependency] {
115 self.manifest().dependencies()
116 }
117 pub fn manifest(&self) -> &Manifest {
119 &self.inner.manifest
120 }
121 pub fn manifest_mut(&mut self) -> &mut Manifest {
123 &mut Rc::make_mut(&mut self.inner).manifest
124 }
125 pub fn manifest_path(&self) -> &Path {
127 &self.inner.manifest_path
128 }
129 pub fn name(&self) -> InternedString {
131 self.package_id().name()
132 }
133 pub fn package_id(&self) -> PackageId {
135 self.manifest().package_id()
136 }
137 pub fn root(&self) -> &Path {
139 self.manifest_path().parent().unwrap()
140 }
141 pub fn summary(&self) -> &Summary {
143 self.manifest().summary()
144 }
145 pub fn targets(&self) -> &[Target] {
147 self.manifest().targets()
148 }
149 pub fn library(&self) -> Option<&Target> {
151 self.targets().iter().find(|t| t.is_lib())
152 }
153 pub fn version(&self) -> &Version {
155 self.package_id().version()
156 }
157 pub fn authors(&self) -> &Vec<String> {
159 &self.manifest().metadata().authors
160 }
161
162 pub fn publish(&self) -> &Option<Vec<String>> {
166 self.manifest().publish()
167 }
168 pub fn proc_macro(&self) -> bool {
170 self.targets().iter().any(|target| target.proc_macro())
171 }
172 pub fn rust_version(&self) -> Option<&RustVersion> {
174 self.manifest().rust_version()
175 }
176
177 pub fn hints(&self) -> Option<&Hints> {
179 self.manifest().hints()
180 }
181
182 pub fn has_custom_build(&self) -> bool {
184 self.targets().iter().any(|t| t.is_custom_build())
185 }
186
187 pub fn map_source(self, to_replace: SourceId, replace_with: SourceId) -> Package {
188 Package {
189 inner: Rc::new(PackageInner {
190 manifest: self.manifest().clone().map_source(to_replace, replace_with),
191 manifest_path: self.manifest_path().to_owned(),
192 }),
193 }
194 }
195
196 pub fn serialized(
197 &self,
198 unstable_flags: &CliUnstable,
199 cargo_features: &Features,
200 ) -> SerializedPackage {
201 let summary = self.manifest().summary();
202 let package_id = summary.package_id();
203 let manmeta = self.manifest().metadata();
204 let targets: Vec<Target> = self
208 .manifest()
209 .targets()
210 .iter()
211 .filter(|t| t.src_path().is_path())
212 .cloned()
213 .collect();
214 let crate_features = summary
216 .features()
217 .iter()
218 .map(|(k, v)| (*k, v.iter().map(|fv| fv.to_string().into()).collect()))
219 .collect();
220
221 SerializedPackage {
222 name: package_id.name(),
223 version: package_id.version().clone(),
224 id: package_id.to_spec(),
225 license: manmeta.license.clone(),
226 license_file: manmeta.license_file.clone(),
227 description: manmeta.description.clone(),
228 source: summary.source_id(),
229 dependencies: summary
230 .dependencies()
231 .iter()
232 .map(|dep| dep.serialized(unstable_flags, cargo_features))
233 .collect(),
234 targets,
235 features: crate_features,
236 manifest_path: self.manifest_path().to_path_buf(),
237 metadata: self.manifest().custom_metadata().cloned(),
238 authors: manmeta.authors.clone(),
239 categories: manmeta.categories.clone(),
240 keywords: manmeta.keywords.clone(),
241 readme: manmeta.readme.clone(),
242 repository: manmeta.repository.clone(),
243 homepage: manmeta.homepage.clone(),
244 documentation: manmeta.documentation.clone(),
245 edition: self.manifest().edition().to_string(),
246 links: self.manifest().links().map(|s| s.to_owned()),
247 metabuild: self.manifest().metabuild().cloned(),
248 publish: self.publish().as_ref().cloned(),
249 default_run: self.manifest().default_run().map(|s| s.to_owned()),
250 rust_version: self.rust_version().cloned(),
251 hints: self.hints().cloned(),
252 }
253 }
254}
255
256impl fmt::Display for Package {
257 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258 write!(f, "{}", self.summary().package_id())
259 }
260}
261
262impl fmt::Debug for Package {
263 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
264 f.debug_struct("Package")
265 .field("id", &self.summary().package_id())
266 .field("..", &"..")
267 .finish()
268 }
269}
270
271impl PartialEq for Package {
272 fn eq(&self, other: &Package) -> bool {
273 self.package_id() == other.package_id()
274 }
275}
276
277impl Eq for Package {}
278
279impl hash::Hash for Package {
280 fn hash<H: hash::Hasher>(&self, into: &mut H) {
281 self.package_id().hash(into)
282 }
283}
284
285pub struct PackageSet<'gctx> {
290 packages: HashMap<PackageId, LazyCell<Package>>,
291 sources: RefCell<SourceMap<'gctx>>,
292 gctx: &'gctx GlobalContext,
293 multi: Multi,
294 downloading: Cell<bool>,
296 multiplexing: bool,
298}
299
300pub struct Downloads<'a, 'gctx> {
302 set: &'a PackageSet<'gctx>,
303 pending: HashMap<usize, (Download<'gctx>, EasyHandle)>,
307 pending_ids: HashSet<PackageId>,
310 sleeping: SleepTracker<(Download<'gctx>, Easy)>,
312 results: Vec<(usize, Result<(), curl::Error>)>,
317 next: usize,
319 progress: RefCell<Option<Progress<'gctx>>>,
321 downloads_finished: usize,
323 downloaded_bytes: u64,
325 largest: (u64, InternedString),
327 start: Instant,
329 success: bool,
331
332 timeout: HttpTimeout,
339 updated_at: Cell<Instant>,
341 next_speed_check: Cell<Instant>,
346 next_speed_check_bytes_threshold: Cell<u64>,
353 _lock: CacheLock<'gctx>,
356}
357
358struct Download<'gctx> {
359 token: usize,
362
363 id: PackageId,
365
366 data: RefCell<Vec<u8>>,
368
369 headers: RefCell<Vec<String>>,
371
372 url: String,
375
376 descriptor: String,
378
379 total: Cell<u64>,
381 current: Cell<u64>,
382
383 start: Instant,
385 timed_out: Cell<Option<String>>,
386
387 retry: Retry<'gctx>,
389}
390
391impl<'gctx> PackageSet<'gctx> {
392 pub fn new(
393 package_ids: &[PackageId],
394 sources: SourceMap<'gctx>,
395 gctx: &'gctx GlobalContext,
396 ) -> CargoResult<PackageSet<'gctx>> {
397 let mut multi = Multi::new();
400 let multiplexing = gctx.http_config()?.multiplexing.unwrap_or(true);
401 multi
402 .pipelining(false, multiplexing)
403 .context("failed to enable multiplexing/pipelining in curl")?;
404
405 multi.set_max_host_connections(2)?;
407
408 Ok(PackageSet {
409 packages: package_ids
410 .iter()
411 .map(|&id| (id, LazyCell::new()))
412 .collect(),
413 sources: RefCell::new(sources),
414 gctx,
415 multi,
416 downloading: Cell::new(false),
417 multiplexing,
418 })
419 }
420
421 pub fn package_ids(&self) -> impl Iterator<Item = PackageId> + '_ {
422 self.packages.keys().cloned()
423 }
424
425 pub fn packages(&self) -> impl Iterator<Item = &Package> {
426 self.packages.values().filter_map(|p| p.borrow())
427 }
428
429 pub fn enable_download<'a>(&'a self) -> CargoResult<Downloads<'a, 'gctx>> {
430 assert!(!self.downloading.replace(true));
431 let timeout = HttpTimeout::new(self.gctx)?;
432 Ok(Downloads {
433 start: Instant::now(),
434 set: self,
435 next: 0,
436 pending: HashMap::new(),
437 pending_ids: HashSet::new(),
438 sleeping: SleepTracker::new(),
439 results: Vec::new(),
440 progress: RefCell::new(Some(Progress::with_style(
441 "Downloading",
442 ProgressStyle::Ratio,
443 self.gctx,
444 ))),
445 downloads_finished: 0,
446 downloaded_bytes: 0,
447 largest: (0, "".into()),
448 success: false,
449 updated_at: Cell::new(Instant::now()),
450 timeout,
451 next_speed_check: Cell::new(Instant::now()),
452 next_speed_check_bytes_threshold: Cell::new(0),
453 _lock: self
454 .gctx
455 .acquire_package_cache_lock(CacheLockMode::DownloadExclusive)?,
456 })
457 }
458
459 pub fn get_one(&self, id: PackageId) -> CargoResult<&Package> {
460 if let Some(pkg) = self.packages.get(&id).and_then(|slot| slot.borrow()) {
461 return Ok(pkg);
462 }
463 Ok(self.get_many(Some(id))?.remove(0))
464 }
465
466 pub fn get_many(&self, ids: impl IntoIterator<Item = PackageId>) -> CargoResult<Vec<&Package>> {
467 let mut pkgs = Vec::new();
468 let _lock = self
469 .gctx
470 .acquire_package_cache_lock(CacheLockMode::DownloadExclusive)?;
471 let mut downloads = self.enable_download()?;
472 for id in ids {
473 pkgs.extend(downloads.start(id)?);
474 }
475 while downloads.remaining() > 0 {
476 pkgs.push(downloads.wait()?);
477 }
478 downloads.success = true;
479 drop(downloads);
480
481 let mut deferred = self.gctx.deferred_global_last_use()?;
482 deferred.save_no_error(self.gctx);
483 Ok(pkgs)
484 }
485
486 #[tracing::instrument(skip_all)]
488 pub fn download_accessible(
489 &self,
490 resolve: &Resolve,
491 root_ids: &[PackageId],
492 has_dev_units: HasDevUnits,
493 requested_kinds: &[CompileKind],
494 target_data: &RustcTargetData<'gctx>,
495 force_all_targets: ForceAllTargets,
496 ) -> CargoResult<()> {
497 fn collect_used_deps(
498 used: &mut BTreeSet<(PackageId, CompileKind)>,
499 resolve: &Resolve,
500 pkg_id: PackageId,
501 has_dev_units: HasDevUnits,
502 requested_kind: CompileKind,
503 target_data: &RustcTargetData<'_>,
504 force_all_targets: ForceAllTargets,
505 ) -> CargoResult<()> {
506 if !used.insert((pkg_id, requested_kind)) {
507 return Ok(());
508 }
509 let requested_kinds = &[requested_kind];
510 let filtered_deps = PackageSet::filter_deps(
511 pkg_id,
512 resolve,
513 has_dev_units,
514 requested_kinds,
515 target_data,
516 force_all_targets,
517 );
518 for (pkg_id, deps) in filtered_deps {
519 collect_used_deps(
520 used,
521 resolve,
522 pkg_id,
523 has_dev_units,
524 requested_kind,
525 target_data,
526 force_all_targets,
527 )?;
528 let artifact_kinds = deps.iter().filter_map(|dep| {
529 Some(
530 dep.artifact()?
531 .target()?
532 .to_resolved_compile_kind(*requested_kinds.iter().next().unwrap()),
533 )
534 });
535 for artifact_kind in artifact_kinds {
536 collect_used_deps(
537 used,
538 resolve,
539 pkg_id,
540 has_dev_units,
541 artifact_kind,
542 target_data,
543 force_all_targets,
544 )?;
545 }
546 }
547 Ok(())
548 }
549
550 let mut to_download = BTreeSet::new();
554
555 for id in root_ids {
556 for requested_kind in requested_kinds {
557 collect_used_deps(
558 &mut to_download,
559 resolve,
560 *id,
561 has_dev_units,
562 *requested_kind,
563 target_data,
564 force_all_targets,
565 )?;
566 }
567 }
568 let to_download = to_download
569 .into_iter()
570 .map(|(p, _)| p)
571 .collect::<BTreeSet<_>>();
572 self.get_many(to_download.into_iter())?;
573 Ok(())
574 }
575
576 pub(crate) fn warn_no_lib_packages_and_artifact_libs_overlapping_deps(
579 &self,
580 ws: &Workspace<'gctx>,
581 resolve: &Resolve,
582 root_ids: &[PackageId],
583 has_dev_units: HasDevUnits,
584 requested_kinds: &[CompileKind],
585 target_data: &RustcTargetData<'_>,
586 force_all_targets: ForceAllTargets,
587 ) -> CargoResult<()> {
588 let no_lib_pkgs: BTreeMap<PackageId, Vec<(&Package, &HashSet<Dependency>)>> = root_ids
589 .iter()
590 .map(|&root_id| {
591 let dep_pkgs_to_deps: Vec<_> = PackageSet::filter_deps(
592 root_id,
593 resolve,
594 has_dev_units,
595 requested_kinds,
596 target_data,
597 force_all_targets,
598 )
599 .collect();
600
601 let dep_pkgs_and_deps = dep_pkgs_to_deps
602 .into_iter()
603 .filter(|(_id, deps)| deps.iter().any(|dep| dep.maybe_lib()))
604 .filter_map(|(dep_package_id, deps)| {
605 self.get_one(dep_package_id).ok().and_then(|dep_pkg| {
606 (!dep_pkg.targets().iter().any(|t| t.is_lib())).then(|| (dep_pkg, deps))
607 })
608 })
609 .collect();
610 (root_id, dep_pkgs_and_deps)
611 })
612 .collect();
613
614 for (pkg_id, dep_pkgs) in no_lib_pkgs {
615 for (_dep_pkg_without_lib_target, deps) in dep_pkgs {
616 for dep in deps.iter().filter(|dep| {
617 dep.artifact()
618 .map(|artifact| artifact.is_lib())
619 .unwrap_or(true)
620 }) {
621 ws.gctx().shell().warn(&format!(
622 "{} ignoring invalid dependency `{}` which is missing a lib target",
623 pkg_id,
624 dep.name_in_toml(),
625 ))?;
626 }
627 }
628 }
629 Ok(())
630 }
631
632 fn filter_deps<'a>(
633 pkg_id: PackageId,
634 resolve: &'a Resolve,
635 has_dev_units: HasDevUnits,
636 requested_kinds: &'a [CompileKind],
637 target_data: &'a RustcTargetData<'_>,
638 force_all_targets: ForceAllTargets,
639 ) -> impl Iterator<Item = (PackageId, &'a HashSet<Dependency>)> + 'a {
640 resolve
641 .deps(pkg_id)
642 .filter(move |&(_id, deps)| {
643 deps.iter().any(|dep| {
644 if dep.kind() == DepKind::Development && has_dev_units == HasDevUnits::No {
645 return false;
646 }
647 if force_all_targets == ForceAllTargets::No {
648 let activated = requested_kinds
649 .iter()
650 .chain(Some(&CompileKind::Host))
651 .any(|kind| target_data.dep_platform_activated(dep, *kind));
652 if !activated {
653 return false;
654 }
655 }
656 true
657 })
658 })
659 .into_iter()
660 }
661
662 pub fn sources(&self) -> Ref<'_, SourceMap<'gctx>> {
663 self.sources.borrow()
664 }
665
666 pub fn sources_mut(&self) -> RefMut<'_, SourceMap<'gctx>> {
667 self.sources.borrow_mut()
668 }
669
670 pub fn add_set(&mut self, set: PackageSet<'gctx>) {
672 assert!(!self.downloading.get());
673 assert!(!set.downloading.get());
674 for (pkg_id, p_cell) in set.packages {
675 self.packages.entry(pkg_id).or_insert(p_cell);
676 }
677 let mut sources = self.sources.borrow_mut();
678 let other_sources = set.sources.into_inner();
679 sources.add_source_map(other_sources);
680 }
681}
682
683impl<'a, 'gctx> Downloads<'a, 'gctx> {
684 #[tracing::instrument(skip_all)]
690 pub fn start(&mut self, id: PackageId) -> CargoResult<Option<&'a Package>> {
691 self.start_inner(id)
692 .with_context(|| format!("failed to download `{}`", id))
693 }
694
695 fn start_inner(&mut self, id: PackageId) -> CargoResult<Option<&'a Package>> {
696 let slot = self
699 .set
700 .packages
701 .get(&id)
702 .ok_or_else(|| internal(format!("couldn't find `{}` in package set", id)))?;
703 if let Some(pkg) = slot.borrow() {
704 return Ok(Some(pkg));
705 }
706
707 let mut sources = self.set.sources.borrow_mut();
711 let source = sources
712 .get_mut(id.source_id())
713 .ok_or_else(|| internal(format!("couldn't find source for `{}`", id)))?;
714 let pkg = source
715 .download(id)
716 .context("unable to get packages from source")?;
717 let (url, descriptor, authorization) = match pkg {
718 MaybePackage::Ready(pkg) => {
719 debug!("{} doesn't need a download", id);
720 assert!(slot.fill(pkg).is_ok());
721 return Ok(Some(slot.borrow().unwrap()));
722 }
723 MaybePackage::Download {
724 url,
725 descriptor,
726 authorization,
727 } => (url, descriptor, authorization),
728 };
729
730 let token = self.next;
735 self.next += 1;
736 debug!(target: "network", "downloading {} as {}", id, token);
737 assert!(self.pending_ids.insert(id));
738
739 let (mut handle, _timeout) = http_handle_and_timeout(self.set.gctx)?;
740 handle.get(true)?;
741 handle.url(&url)?;
742 handle.follow_location(true)?; if let Some(authorization) = authorization {
746 let mut headers = curl::easy::List::new();
747 headers.append(&format!("Authorization: {}", authorization))?;
748 handle.http_headers(headers)?;
749 }
750
751 crate::try_old_curl_http2_pipewait!(self.set.multiplexing, handle);
753
754 handle.write_function(move |buf| {
755 debug!(target: "network", "{} - {} bytes of data", token, buf.len());
756 tls::with(|downloads| {
757 if let Some(downloads) = downloads {
758 downloads.pending[&token]
759 .0
760 .data
761 .borrow_mut()
762 .extend_from_slice(buf);
763 }
764 });
765 Ok(buf.len())
766 })?;
767 handle.header_function(move |data| {
768 tls::with(|downloads| {
769 if let Some(downloads) = downloads {
770 let h = String::from_utf8_lossy(data).trim().to_string();
773 downloads.pending[&token].0.headers.borrow_mut().push(h);
774 }
775 });
776 true
777 })?;
778
779 handle.progress(true)?;
780 handle.progress_function(move |dl_total, dl_cur, _, _| {
781 tls::with(|downloads| match downloads {
782 Some(d) => d.progress(token, dl_total as u64, dl_cur as u64),
783 None => false,
784 })
785 })?;
786
787 if self.downloads_finished == 0
791 && self.pending.is_empty()
792 && !self.progress.borrow().as_ref().unwrap().is_enabled()
793 {
794 self.set.gctx.shell().status("Downloading", "crates ...")?;
795 }
796
797 let dl = Download {
798 token,
799 data: RefCell::new(Vec::new()),
800 headers: RefCell::new(Vec::new()),
801 id,
802 url,
803 descriptor,
804 total: Cell::new(0),
805 current: Cell::new(0),
806 start: Instant::now(),
807 timed_out: Cell::new(None),
808 retry: Retry::new(self.set.gctx)?,
809 };
810 self.enqueue(dl, handle)?;
811 self.tick(WhyTick::DownloadStarted)?;
812
813 Ok(None)
814 }
815
816 pub fn remaining(&self) -> usize {
818 self.pending.len() + self.sleeping.len()
819 }
820
821 #[tracing::instrument(skip_all)]
830 pub fn wait(&mut self) -> CargoResult<&'a Package> {
831 let (dl, data) = loop {
832 assert_eq!(self.pending.len(), self.pending_ids.len());
833 let (token, result) = self.wait_for_curl()?;
834 debug!(target: "network", "{} finished with {:?}", token, result);
835
836 let (mut dl, handle) = self
837 .pending
838 .remove(&token)
839 .expect("got a token for a non-in-progress transfer");
840 let data = mem::take(&mut *dl.data.borrow_mut());
841 let headers = mem::take(&mut *dl.headers.borrow_mut());
842 let mut handle = self.set.multi.remove(handle)?;
843 self.pending_ids.remove(&dl.id);
844
845 let ret = {
849 let timed_out = &dl.timed_out;
850 let url = &dl.url;
851 dl.retry.r#try(|| {
852 if let Err(e) = result {
853 if !e.is_aborted_by_callback() {
861 return Err(e.into());
862 }
863
864 return Err(match timed_out.replace(None) {
865 Some(msg) => {
866 let code = curl_sys::CURLE_OPERATION_TIMEDOUT;
867 let mut err = curl::Error::new(code);
868 err.set_extra(msg);
869 err
870 }
871 None => e,
872 }
873 .into());
874 }
875
876 let code = handle.response_code()?;
877 if code != 200 && code != 0 {
878 return Err(HttpNotSuccessful::new_from_handle(
879 &mut handle,
880 &url,
881 data,
882 headers,
883 )
884 .into());
885 }
886 Ok(data)
887 })
888 };
889 match ret {
890 RetryResult::Success(data) => break (dl, data),
891 RetryResult::Err(e) => {
892 return Err(e.context(format!("failed to download from `{}`", dl.url)));
893 }
894 RetryResult::Retry(sleep) => {
895 debug!(target: "network", "download retry {} for {sleep}ms", dl.url);
896 self.sleeping.push(sleep, (dl, handle));
897 }
898 }
899 };
900
901 self.progress.borrow_mut().as_mut().unwrap().clear();
905 self.set.gctx.shell().status("Downloaded", &dl.descriptor)?;
906
907 self.downloads_finished += 1;
908 self.downloaded_bytes += dl.total.get();
909 if dl.total.get() > self.largest.0 {
910 self.largest = (dl.total.get(), dl.id.name());
911 }
912
913 let kib_400 = 1024 * 400;
919 if dl.total.get() < kib_400 {
920 self.tick(WhyTick::DownloadFinished)?;
921 } else {
922 self.tick(WhyTick::Extracting(&dl.id.name()))?;
923 }
924
925 let mut sources = self.set.sources.borrow_mut();
928 let source = sources
929 .get_mut(dl.id.source_id())
930 .ok_or_else(|| internal(format!("couldn't find source for `{}`", dl.id)))?;
931 let start = Instant::now();
932 let pkg = source.finish_download(dl.id, data)?;
933
934 let finish_dur = start.elapsed();
939 self.updated_at.set(self.updated_at.get() + finish_dur);
940 self.next_speed_check
941 .set(self.next_speed_check.get() + finish_dur);
942
943 let slot = &self.set.packages[&dl.id];
944 assert!(slot.fill(pkg).is_ok());
945 Ok(slot.borrow().unwrap())
946 }
947
948 fn enqueue(&mut self, dl: Download<'gctx>, handle: Easy) -> CargoResult<()> {
949 let mut handle = self.set.multi.add(handle)?;
950 let now = Instant::now();
951 handle.set_token(dl.token)?;
952 self.updated_at.set(now);
953 self.next_speed_check.set(now + self.timeout.dur);
954 self.next_speed_check_bytes_threshold
955 .set(u64::from(self.timeout.low_speed_limit));
956 dl.timed_out.set(None);
957 dl.current.set(0);
958 dl.total.set(0);
959 self.pending.insert(dl.token, (dl, handle));
960 Ok(())
961 }
962
963 fn wait_for_curl(&mut self) -> CargoResult<(usize, Result<(), curl::Error>)> {
966 loop {
982 self.add_sleepers()?;
983 let n = tls::set(self, || {
984 self.set
985 .multi
986 .perform()
987 .context("failed to perform http requests")
988 })?;
989 debug!(target: "network", "handles remaining: {}", n);
990 let results = &mut self.results;
991 let pending = &self.pending;
992 self.set.multi.messages(|msg| {
993 let token = msg.token().expect("failed to read token");
994 let handle = &pending[&token].1;
995 if let Some(result) = msg.result_for(handle) {
996 results.push((token, result));
997 } else {
998 debug!(target: "network", "message without a result (?)");
999 }
1000 });
1001
1002 if let Some(pair) = results.pop() {
1003 break Ok(pair);
1004 }
1005 assert_ne!(self.remaining(), 0);
1006 if self.pending.is_empty() {
1007 let delay = self.sleeping.time_to_next().unwrap();
1008 debug!(target: "network", "sleeping main thread for {delay:?}");
1009 std::thread::sleep(delay);
1010 } else {
1011 let min_timeout = Duration::new(1, 0);
1012 let timeout = self.set.multi.get_timeout()?.unwrap_or(min_timeout);
1013 let timeout = timeout.min(min_timeout);
1014 self.set
1015 .multi
1016 .wait(&mut [], timeout)
1017 .context("failed to wait on curl `Multi`")?;
1018 }
1019 }
1020 }
1021
1022 fn add_sleepers(&mut self) -> CargoResult<()> {
1023 for (dl, handle) in self.sleeping.to_retry() {
1024 self.pending_ids.insert(dl.id);
1025 self.enqueue(dl, handle)?;
1026 }
1027 Ok(())
1028 }
1029
1030 fn progress(&self, token: usize, total: u64, cur: u64) -> bool {
1031 let dl = &self.pending[&token].0;
1032 dl.total.set(total);
1033 let now = Instant::now();
1034 if cur > dl.current.get() {
1035 let delta = cur - dl.current.get();
1036 let threshold = self.next_speed_check_bytes_threshold.get();
1037
1038 dl.current.set(cur);
1039 self.updated_at.set(now);
1040
1041 if delta >= threshold {
1042 self.next_speed_check.set(now + self.timeout.dur);
1043 self.next_speed_check_bytes_threshold
1044 .set(u64::from(self.timeout.low_speed_limit));
1045 } else {
1046 self.next_speed_check_bytes_threshold.set(threshold - delta);
1047 }
1048 }
1049 if self.tick(WhyTick::DownloadUpdate).is_err() {
1050 return false;
1051 }
1052
1053 if now > self.updated_at.get() + self.timeout.dur {
1055 self.updated_at.set(now);
1056 let msg = format!(
1057 "failed to download any data for `{}` within {}s",
1058 dl.id,
1059 self.timeout.dur.as_secs()
1060 );
1061 dl.timed_out.set(Some(msg));
1062 return false;
1063 }
1064
1065 if now >= self.next_speed_check.get() {
1070 self.next_speed_check.set(now + self.timeout.dur);
1071 assert!(self.next_speed_check_bytes_threshold.get() > 0);
1072 let msg = format!(
1073 "download of `{}` failed to transfer more \
1074 than {} bytes in {}s",
1075 dl.id,
1076 self.timeout.low_speed_limit,
1077 self.timeout.dur.as_secs()
1078 );
1079 dl.timed_out.set(Some(msg));
1080 return false;
1081 }
1082
1083 true
1084 }
1085
1086 fn tick(&self, why: WhyTick<'_>) -> CargoResult<()> {
1087 let mut progress = self.progress.borrow_mut();
1088 let progress = progress.as_mut().unwrap();
1089
1090 if let WhyTick::DownloadUpdate = why {
1091 if !progress.update_allowed() {
1092 return Ok(());
1093 }
1094 }
1095 let pending = self.remaining();
1096 let mut msg = if pending == 1 {
1097 format!("{} crate", pending)
1098 } else {
1099 format!("{} crates", pending)
1100 };
1101 match why {
1102 WhyTick::Extracting(krate) => {
1103 msg.push_str(&format!(", extracting {} ...", krate));
1104 }
1105 _ => {
1106 let mut dur = Duration::new(0, 0);
1107 let mut remaining = 0;
1108 for (dl, _) in self.pending.values() {
1109 dur += dl.start.elapsed();
1110 if dl.total.get() >= dl.current.get() {
1114 remaining += dl.total.get() - dl.current.get();
1115 }
1116 }
1117 if remaining > 0 && dur > Duration::from_millis(500) {
1118 msg.push_str(&format!(", remaining bytes: {:.1}", HumanBytes(remaining)));
1119 }
1120 }
1121 }
1122 progress.print_now(&msg)
1123 }
1124}
1125
1126#[derive(Copy, Clone)]
1127enum WhyTick<'a> {
1128 DownloadStarted,
1129 DownloadUpdate,
1130 DownloadFinished,
1131 Extracting(&'a str),
1132}
1133
1134impl<'a, 'gctx> Drop for Downloads<'a, 'gctx> {
1135 fn drop(&mut self) {
1136 self.set.downloading.set(false);
1137 let progress = self.progress.get_mut().take().unwrap();
1138 if !progress.is_enabled() {
1141 return;
1142 }
1143 if self.downloads_finished == 0 {
1145 return;
1146 }
1147 if !self.success {
1149 return;
1150 }
1151 let crate_string = if self.downloads_finished == 1 {
1153 "crate"
1154 } else {
1155 "crates"
1156 };
1157 let mut status = format!(
1158 "{} {} ({:.1}) in {}",
1159 self.downloads_finished,
1160 crate_string,
1161 HumanBytes(self.downloaded_bytes),
1162 util::elapsed(self.start.elapsed())
1163 );
1164 let mib_1 = 1024 * 1024;
1168 if self.largest.0 > mib_1 && self.downloads_finished > 1 {
1169 status.push_str(&format!(
1170 " (largest was `{}` at {:.1})",
1171 self.largest.1,
1172 HumanBytes(self.largest.0),
1173 ));
1174 }
1175 drop(progress);
1177 drop(self.set.gctx.shell().status("Downloaded", status));
1178 }
1179}
1180
1181mod tls {
1182 use std::cell::Cell;
1183
1184 use super::Downloads;
1185
1186 thread_local!(static PTR: Cell<usize> = const { Cell::new(0) });
1187
1188 pub(crate) fn with<R>(f: impl FnOnce(Option<&Downloads<'_, '_>>) -> R) -> R {
1189 let ptr = PTR.with(|p| p.get());
1190 if ptr == 0 {
1191 f(None)
1192 } else {
1193 unsafe { f(Some(&*(ptr as *const Downloads<'_, '_>))) }
1194 }
1195 }
1196
1197 pub(crate) fn set<R>(dl: &Downloads<'_, '_>, f: impl FnOnce() -> R) -> R {
1198 struct Reset<'a, T: Copy>(&'a Cell<T>, T);
1199
1200 impl<'a, T: Copy> Drop for Reset<'a, T> {
1201 fn drop(&mut self) {
1202 self.0.set(self.1);
1203 }
1204 }
1205
1206 PTR.with(|p| {
1207 let _reset = Reset(p, p.get());
1208 p.set(dl as *const Downloads<'_, '_> as usize);
1209 f()
1210 })
1211 }
1212}