1use std::cell::{Cell, Ref, RefCell, RefMut};
2use std::cmp::Ordering;
3use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
4use std::fmt;
5use std::hash;
6use std::mem;
7use std::path::{Path, PathBuf};
8use std::rc::Rc;
9use std::time::{Duration, Instant};
10
11use anyhow::Context as _;
12use cargo_util_schemas::manifest::RustVersion;
13use curl::easy::Easy;
14use curl::multi::{EasyHandle, Multi};
15use lazycell::LazyCell;
16use semver::Version;
17use serde::Serialize;
18use tracing::debug;
19
20use crate::core::compiler::{CompileKind, RustcTargetData};
21use crate::core::dependency::DepKind;
22use crate::core::resolver::features::ForceAllTargets;
23use crate::core::resolver::{HasDevUnits, Resolve};
24use crate::core::{
25 CliUnstable, Dependency, Features, Manifest, PackageId, PackageIdSpec, SerializedDependency,
26 SourceId, Target,
27};
28use crate::core::{Summary, Workspace};
29use crate::sources::source::{MaybePackage, SourceMap};
30use crate::util::cache_lock::{CacheLock, CacheLockMode};
31use crate::util::errors::{CargoResult, HttpNotSuccessful};
32use crate::util::interning::InternedString;
33use crate::util::network::http::http_handle_and_timeout;
34use crate::util::network::http::HttpTimeout;
35use crate::util::network::retry::{Retry, RetryResult};
36use crate::util::network::sleep::SleepTracker;
37use crate::util::HumanBytes;
38use crate::util::{self, internal, GlobalContext, Progress, ProgressStyle};
39
40#[derive(Clone)]
44pub struct Package {
45 inner: Rc<PackageInner>,
46}
47
48#[derive(Clone)]
49struct PackageInner {
51 manifest: Manifest,
53 manifest_path: PathBuf,
55}
56
57impl Ord for Package {
58 fn cmp(&self, other: &Package) -> Ordering {
59 self.package_id().cmp(&other.package_id())
60 }
61}
62
63impl PartialOrd for Package {
64 fn partial_cmp(&self, other: &Package) -> Option<Ordering> {
65 Some(self.cmp(other))
66 }
67}
68
69#[derive(Serialize)]
71pub struct SerializedPackage {
72 name: InternedString,
73 version: Version,
74 id: PackageIdSpec,
75 license: Option<String>,
76 license_file: Option<String>,
77 description: Option<String>,
78 source: SourceId,
79 dependencies: Vec<SerializedDependency>,
80 targets: Vec<Target>,
81 features: BTreeMap<InternedString, Vec<InternedString>>,
82 manifest_path: PathBuf,
83 metadata: Option<toml::Value>,
84 publish: Option<Vec<String>>,
85 authors: Vec<String>,
86 categories: Vec<String>,
87 keywords: Vec<String>,
88 readme: Option<String>,
89 repository: Option<String>,
90 homepage: Option<String>,
91 documentation: Option<String>,
92 edition: String,
93 links: Option<String>,
94 #[serde(skip_serializing_if = "Option::is_none")]
95 metabuild: Option<Vec<String>>,
96 default_run: Option<String>,
97 rust_version: Option<RustVersion>,
98}
99
100impl Package {
101 pub fn new(manifest: Manifest, manifest_path: &Path) -> Package {
103 Package {
104 inner: Rc::new(PackageInner {
105 manifest,
106 manifest_path: manifest_path.to_path_buf(),
107 }),
108 }
109 }
110
111 pub fn dependencies(&self) -> &[Dependency] {
113 self.manifest().dependencies()
114 }
115 pub fn manifest(&self) -> &Manifest {
117 &self.inner.manifest
118 }
119 pub fn manifest_mut(&mut self) -> &mut Manifest {
121 &mut Rc::make_mut(&mut self.inner).manifest
122 }
123 pub fn manifest_path(&self) -> &Path {
125 &self.inner.manifest_path
126 }
127 pub fn name(&self) -> InternedString {
129 self.package_id().name()
130 }
131 pub fn package_id(&self) -> PackageId {
133 self.manifest().package_id()
134 }
135 pub fn root(&self) -> &Path {
137 self.manifest_path().parent().unwrap()
138 }
139 pub fn summary(&self) -> &Summary {
141 self.manifest().summary()
142 }
143 pub fn targets(&self) -> &[Target] {
145 self.manifest().targets()
146 }
147 pub fn library(&self) -> Option<&Target> {
149 self.targets().iter().find(|t| t.is_lib())
150 }
151 pub fn version(&self) -> &Version {
153 self.package_id().version()
154 }
155 pub fn authors(&self) -> &Vec<String> {
157 &self.manifest().metadata().authors
158 }
159
160 pub fn publish(&self) -> &Option<Vec<String>> {
164 self.manifest().publish()
165 }
166 pub fn proc_macro(&self) -> bool {
168 self.targets().iter().any(|target| target.proc_macro())
169 }
170 pub fn rust_version(&self) -> Option<&RustVersion> {
172 self.manifest().rust_version()
173 }
174
175 pub fn has_custom_build(&self) -> bool {
177 self.targets().iter().any(|t| t.is_custom_build())
178 }
179
180 pub fn map_source(self, to_replace: SourceId, replace_with: SourceId) -> Package {
181 Package {
182 inner: Rc::new(PackageInner {
183 manifest: self.manifest().clone().map_source(to_replace, replace_with),
184 manifest_path: self.manifest_path().to_owned(),
185 }),
186 }
187 }
188
189 pub fn serialized(
190 &self,
191 unstable_flags: &CliUnstable,
192 cargo_features: &Features,
193 ) -> SerializedPackage {
194 let summary = self.manifest().summary();
195 let package_id = summary.package_id();
196 let manmeta = self.manifest().metadata();
197 let targets: Vec<Target> = self
201 .manifest()
202 .targets()
203 .iter()
204 .filter(|t| t.src_path().is_path())
205 .cloned()
206 .collect();
207 let crate_features = summary
209 .features()
210 .iter()
211 .map(|(k, v)| (*k, v.iter().map(|fv| fv.to_string().into()).collect()))
212 .collect();
213
214 SerializedPackage {
215 name: package_id.name(),
216 version: package_id.version().clone(),
217 id: package_id.to_spec(),
218 license: manmeta.license.clone(),
219 license_file: manmeta.license_file.clone(),
220 description: manmeta.description.clone(),
221 source: summary.source_id(),
222 dependencies: summary
223 .dependencies()
224 .iter()
225 .map(|dep| dep.serialized(unstable_flags, cargo_features))
226 .collect(),
227 targets,
228 features: crate_features,
229 manifest_path: self.manifest_path().to_path_buf(),
230 metadata: self.manifest().custom_metadata().cloned(),
231 authors: manmeta.authors.clone(),
232 categories: manmeta.categories.clone(),
233 keywords: manmeta.keywords.clone(),
234 readme: manmeta.readme.clone(),
235 repository: manmeta.repository.clone(),
236 homepage: manmeta.homepage.clone(),
237 documentation: manmeta.documentation.clone(),
238 edition: self.manifest().edition().to_string(),
239 links: self.manifest().links().map(|s| s.to_owned()),
240 metabuild: self.manifest().metabuild().cloned(),
241 publish: self.publish().as_ref().cloned(),
242 default_run: self.manifest().default_run().map(|s| s.to_owned()),
243 rust_version: self.rust_version().cloned(),
244 }
245 }
246}
247
248impl fmt::Display for Package {
249 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
250 write!(f, "{}", self.summary().package_id())
251 }
252}
253
254impl fmt::Debug for Package {
255 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
256 f.debug_struct("Package")
257 .field("id", &self.summary().package_id())
258 .field("..", &"..")
259 .finish()
260 }
261}
262
263impl PartialEq for Package {
264 fn eq(&self, other: &Package) -> bool {
265 self.package_id() == other.package_id()
266 }
267}
268
269impl Eq for Package {}
270
271impl hash::Hash for Package {
272 fn hash<H: hash::Hasher>(&self, into: &mut H) {
273 self.package_id().hash(into)
274 }
275}
276
277pub struct PackageSet<'gctx> {
282 packages: HashMap<PackageId, LazyCell<Package>>,
283 sources: RefCell<SourceMap<'gctx>>,
284 gctx: &'gctx GlobalContext,
285 multi: Multi,
286 downloading: Cell<bool>,
288 multiplexing: bool,
290}
291
292pub struct Downloads<'a, 'gctx> {
294 set: &'a PackageSet<'gctx>,
295 pending: HashMap<usize, (Download<'gctx>, EasyHandle)>,
299 pending_ids: HashSet<PackageId>,
302 sleeping: SleepTracker<(Download<'gctx>, Easy)>,
304 results: Vec<(usize, Result<(), curl::Error>)>,
309 next: usize,
311 progress: RefCell<Option<Progress<'gctx>>>,
313 downloads_finished: usize,
315 downloaded_bytes: u64,
317 largest: (u64, InternedString),
319 start: Instant,
321 success: bool,
323
324 timeout: HttpTimeout,
331 updated_at: Cell<Instant>,
333 next_speed_check: Cell<Instant>,
338 next_speed_check_bytes_threshold: Cell<u64>,
345 _lock: CacheLock<'gctx>,
348}
349
350struct Download<'gctx> {
351 token: usize,
354
355 id: PackageId,
357
358 data: RefCell<Vec<u8>>,
360
361 headers: RefCell<Vec<String>>,
363
364 url: String,
367
368 descriptor: String,
370
371 total: Cell<u64>,
373 current: Cell<u64>,
374
375 start: Instant,
377 timed_out: Cell<Option<String>>,
378
379 retry: Retry<'gctx>,
381}
382
383impl<'gctx> PackageSet<'gctx> {
384 pub fn new(
385 package_ids: &[PackageId],
386 sources: SourceMap<'gctx>,
387 gctx: &'gctx GlobalContext,
388 ) -> CargoResult<PackageSet<'gctx>> {
389 let mut multi = Multi::new();
392 let multiplexing = gctx.http_config()?.multiplexing.unwrap_or(true);
393 multi
394 .pipelining(false, multiplexing)
395 .context("failed to enable multiplexing/pipelining in curl")?;
396
397 multi.set_max_host_connections(2)?;
399
400 Ok(PackageSet {
401 packages: package_ids
402 .iter()
403 .map(|&id| (id, LazyCell::new()))
404 .collect(),
405 sources: RefCell::new(sources),
406 gctx,
407 multi,
408 downloading: Cell::new(false),
409 multiplexing,
410 })
411 }
412
413 pub fn package_ids(&self) -> impl Iterator<Item = PackageId> + '_ {
414 self.packages.keys().cloned()
415 }
416
417 pub fn packages(&self) -> impl Iterator<Item = &Package> {
418 self.packages.values().filter_map(|p| p.borrow())
419 }
420
421 pub fn enable_download<'a>(&'a self) -> CargoResult<Downloads<'a, 'gctx>> {
422 assert!(!self.downloading.replace(true));
423 let timeout = HttpTimeout::new(self.gctx)?;
424 Ok(Downloads {
425 start: Instant::now(),
426 set: self,
427 next: 0,
428 pending: HashMap::new(),
429 pending_ids: HashSet::new(),
430 sleeping: SleepTracker::new(),
431 results: Vec::new(),
432 progress: RefCell::new(Some(Progress::with_style(
433 "Downloading",
434 ProgressStyle::Ratio,
435 self.gctx,
436 ))),
437 downloads_finished: 0,
438 downloaded_bytes: 0,
439 largest: (0, "".into()),
440 success: false,
441 updated_at: Cell::new(Instant::now()),
442 timeout,
443 next_speed_check: Cell::new(Instant::now()),
444 next_speed_check_bytes_threshold: Cell::new(0),
445 _lock: self
446 .gctx
447 .acquire_package_cache_lock(CacheLockMode::DownloadExclusive)?,
448 })
449 }
450
451 pub fn get_one(&self, id: PackageId) -> CargoResult<&Package> {
452 if let Some(pkg) = self.packages.get(&id).and_then(|slot| slot.borrow()) {
453 return Ok(pkg);
454 }
455 Ok(self.get_many(Some(id))?.remove(0))
456 }
457
458 pub fn get_many(&self, ids: impl IntoIterator<Item = PackageId>) -> CargoResult<Vec<&Package>> {
459 let mut pkgs = Vec::new();
460 let _lock = self
461 .gctx
462 .acquire_package_cache_lock(CacheLockMode::DownloadExclusive)?;
463 let mut downloads = self.enable_download()?;
464 for id in ids {
465 pkgs.extend(downloads.start(id)?);
466 }
467 while downloads.remaining() > 0 {
468 pkgs.push(downloads.wait()?);
469 }
470 downloads.success = true;
471 drop(downloads);
472
473 let mut deferred = self.gctx.deferred_global_last_use()?;
474 deferred.save_no_error(self.gctx);
475 Ok(pkgs)
476 }
477
478 #[tracing::instrument(skip_all)]
480 pub fn download_accessible(
481 &self,
482 resolve: &Resolve,
483 root_ids: &[PackageId],
484 has_dev_units: HasDevUnits,
485 requested_kinds: &[CompileKind],
486 target_data: &RustcTargetData<'gctx>,
487 force_all_targets: ForceAllTargets,
488 ) -> CargoResult<()> {
489 fn collect_used_deps(
490 used: &mut BTreeSet<(PackageId, CompileKind)>,
491 resolve: &Resolve,
492 pkg_id: PackageId,
493 has_dev_units: HasDevUnits,
494 requested_kind: CompileKind,
495 target_data: &RustcTargetData<'_>,
496 force_all_targets: ForceAllTargets,
497 ) -> CargoResult<()> {
498 if !used.insert((pkg_id, requested_kind)) {
499 return Ok(());
500 }
501 let requested_kinds = &[requested_kind];
502 let filtered_deps = PackageSet::filter_deps(
503 pkg_id,
504 resolve,
505 has_dev_units,
506 requested_kinds,
507 target_data,
508 force_all_targets,
509 );
510 for (pkg_id, deps) in filtered_deps {
511 collect_used_deps(
512 used,
513 resolve,
514 pkg_id,
515 has_dev_units,
516 requested_kind,
517 target_data,
518 force_all_targets,
519 )?;
520 let artifact_kinds = deps.iter().filter_map(|dep| {
521 Some(
522 dep.artifact()?
523 .target()?
524 .to_resolved_compile_kind(*requested_kinds.iter().next().unwrap()),
525 )
526 });
527 for artifact_kind in artifact_kinds {
528 collect_used_deps(
529 used,
530 resolve,
531 pkg_id,
532 has_dev_units,
533 artifact_kind,
534 target_data,
535 force_all_targets,
536 )?;
537 }
538 }
539 Ok(())
540 }
541
542 let mut to_download = BTreeSet::new();
546
547 for id in root_ids {
548 for requested_kind in requested_kinds {
549 collect_used_deps(
550 &mut to_download,
551 resolve,
552 *id,
553 has_dev_units,
554 *requested_kind,
555 target_data,
556 force_all_targets,
557 )?;
558 }
559 }
560 let to_download = to_download
561 .into_iter()
562 .map(|(p, _)| p)
563 .collect::<BTreeSet<_>>();
564 self.get_many(to_download.into_iter())?;
565 Ok(())
566 }
567
568 pub(crate) fn warn_no_lib_packages_and_artifact_libs_overlapping_deps(
571 &self,
572 ws: &Workspace<'gctx>,
573 resolve: &Resolve,
574 root_ids: &[PackageId],
575 has_dev_units: HasDevUnits,
576 requested_kinds: &[CompileKind],
577 target_data: &RustcTargetData<'_>,
578 force_all_targets: ForceAllTargets,
579 ) -> CargoResult<()> {
580 let no_lib_pkgs: BTreeMap<PackageId, Vec<(&Package, &HashSet<Dependency>)>> = root_ids
581 .iter()
582 .map(|&root_id| {
583 let dep_pkgs_to_deps: Vec<_> = PackageSet::filter_deps(
584 root_id,
585 resolve,
586 has_dev_units,
587 requested_kinds,
588 target_data,
589 force_all_targets,
590 )
591 .collect();
592
593 let dep_pkgs_and_deps = dep_pkgs_to_deps
594 .into_iter()
595 .filter(|(_id, deps)| deps.iter().any(|dep| dep.maybe_lib()))
596 .filter_map(|(dep_package_id, deps)| {
597 self.get_one(dep_package_id).ok().and_then(|dep_pkg| {
598 (!dep_pkg.targets().iter().any(|t| t.is_lib())).then(|| (dep_pkg, deps))
599 })
600 })
601 .collect();
602 (root_id, dep_pkgs_and_deps)
603 })
604 .collect();
605
606 for (pkg_id, dep_pkgs) in no_lib_pkgs {
607 for (_dep_pkg_without_lib_target, deps) in dep_pkgs {
608 for dep in deps.iter().filter(|dep| {
609 dep.artifact()
610 .map(|artifact| artifact.is_lib())
611 .unwrap_or(true)
612 }) {
613 ws.gctx().shell().warn(&format!(
614 "{} ignoring invalid dependency `{}` which is missing a lib target",
615 pkg_id,
616 dep.name_in_toml(),
617 ))?;
618 }
619 }
620 }
621 Ok(())
622 }
623
624 fn filter_deps<'a>(
625 pkg_id: PackageId,
626 resolve: &'a Resolve,
627 has_dev_units: HasDevUnits,
628 requested_kinds: &'a [CompileKind],
629 target_data: &'a RustcTargetData<'_>,
630 force_all_targets: ForceAllTargets,
631 ) -> impl Iterator<Item = (PackageId, &'a HashSet<Dependency>)> + 'a {
632 resolve
633 .deps(pkg_id)
634 .filter(move |&(_id, deps)| {
635 deps.iter().any(|dep| {
636 if dep.kind() == DepKind::Development && has_dev_units == HasDevUnits::No {
637 return false;
638 }
639 if force_all_targets == ForceAllTargets::No {
640 let activated = requested_kinds
641 .iter()
642 .chain(Some(&CompileKind::Host))
643 .any(|kind| target_data.dep_platform_activated(dep, *kind));
644 if !activated {
645 return false;
646 }
647 }
648 true
649 })
650 })
651 .into_iter()
652 }
653
654 pub fn sources(&self) -> Ref<'_, SourceMap<'gctx>> {
655 self.sources.borrow()
656 }
657
658 pub fn sources_mut(&self) -> RefMut<'_, SourceMap<'gctx>> {
659 self.sources.borrow_mut()
660 }
661
662 pub fn add_set(&mut self, set: PackageSet<'gctx>) {
664 assert!(!self.downloading.get());
665 assert!(!set.downloading.get());
666 for (pkg_id, p_cell) in set.packages {
667 self.packages.entry(pkg_id).or_insert(p_cell);
668 }
669 let mut sources = self.sources.borrow_mut();
670 let other_sources = set.sources.into_inner();
671 sources.add_source_map(other_sources);
672 }
673}
674
675impl<'a, 'gctx> Downloads<'a, 'gctx> {
676 #[tracing::instrument(skip_all)]
682 pub fn start(&mut self, id: PackageId) -> CargoResult<Option<&'a Package>> {
683 self.start_inner(id)
684 .with_context(|| format!("failed to download `{}`", id))
685 }
686
687 fn start_inner(&mut self, id: PackageId) -> CargoResult<Option<&'a Package>> {
688 let slot = self
691 .set
692 .packages
693 .get(&id)
694 .ok_or_else(|| internal(format!("couldn't find `{}` in package set", id)))?;
695 if let Some(pkg) = slot.borrow() {
696 return Ok(Some(pkg));
697 }
698
699 let mut sources = self.set.sources.borrow_mut();
703 let source = sources
704 .get_mut(id.source_id())
705 .ok_or_else(|| internal(format!("couldn't find source for `{}`", id)))?;
706 let pkg = source
707 .download(id)
708 .context("unable to get packages from source")?;
709 let (url, descriptor, authorization) = match pkg {
710 MaybePackage::Ready(pkg) => {
711 debug!("{} doesn't need a download", id);
712 assert!(slot.fill(pkg).is_ok());
713 return Ok(Some(slot.borrow().unwrap()));
714 }
715 MaybePackage::Download {
716 url,
717 descriptor,
718 authorization,
719 } => (url, descriptor, authorization),
720 };
721
722 let token = self.next;
727 self.next += 1;
728 debug!(target: "network", "downloading {} as {}", id, token);
729 assert!(self.pending_ids.insert(id));
730
731 let (mut handle, _timeout) = http_handle_and_timeout(self.set.gctx)?;
732 handle.get(true)?;
733 handle.url(&url)?;
734 handle.follow_location(true)?; if let Some(authorization) = authorization {
738 let mut headers = curl::easy::List::new();
739 headers.append(&format!("Authorization: {}", authorization))?;
740 handle.http_headers(headers)?;
741 }
742
743 crate::try_old_curl_http2_pipewait!(self.set.multiplexing, handle);
745
746 handle.write_function(move |buf| {
747 debug!(target: "network", "{} - {} bytes of data", token, buf.len());
748 tls::with(|downloads| {
749 if let Some(downloads) = downloads {
750 downloads.pending[&token]
751 .0
752 .data
753 .borrow_mut()
754 .extend_from_slice(buf);
755 }
756 });
757 Ok(buf.len())
758 })?;
759 handle.header_function(move |data| {
760 tls::with(|downloads| {
761 if let Some(downloads) = downloads {
762 let h = String::from_utf8_lossy(data).trim().to_string();
765 downloads.pending[&token].0.headers.borrow_mut().push(h);
766 }
767 });
768 true
769 })?;
770
771 handle.progress(true)?;
772 handle.progress_function(move |dl_total, dl_cur, _, _| {
773 tls::with(|downloads| match downloads {
774 Some(d) => d.progress(token, dl_total as u64, dl_cur as u64),
775 None => false,
776 })
777 })?;
778
779 if self.downloads_finished == 0
783 && self.pending.is_empty()
784 && !self.progress.borrow().as_ref().unwrap().is_enabled()
785 {
786 self.set.gctx.shell().status("Downloading", "crates ...")?;
787 }
788
789 let dl = Download {
790 token,
791 data: RefCell::new(Vec::new()),
792 headers: RefCell::new(Vec::new()),
793 id,
794 url,
795 descriptor,
796 total: Cell::new(0),
797 current: Cell::new(0),
798 start: Instant::now(),
799 timed_out: Cell::new(None),
800 retry: Retry::new(self.set.gctx)?,
801 };
802 self.enqueue(dl, handle)?;
803 self.tick(WhyTick::DownloadStarted)?;
804
805 Ok(None)
806 }
807
808 pub fn remaining(&self) -> usize {
810 self.pending.len() + self.sleeping.len()
811 }
812
813 #[tracing::instrument(skip_all)]
822 pub fn wait(&mut self) -> CargoResult<&'a Package> {
823 let (dl, data) = loop {
824 assert_eq!(self.pending.len(), self.pending_ids.len());
825 let (token, result) = self.wait_for_curl()?;
826 debug!(target: "network", "{} finished with {:?}", token, result);
827
828 let (mut dl, handle) = self
829 .pending
830 .remove(&token)
831 .expect("got a token for a non-in-progress transfer");
832 let data = mem::take(&mut *dl.data.borrow_mut());
833 let headers = mem::take(&mut *dl.headers.borrow_mut());
834 let mut handle = self.set.multi.remove(handle)?;
835 self.pending_ids.remove(&dl.id);
836
837 let ret = {
841 let timed_out = &dl.timed_out;
842 let url = &dl.url;
843 dl.retry.r#try(|| {
844 if let Err(e) = result {
845 if !e.is_aborted_by_callback() {
853 return Err(e.into());
854 }
855
856 return Err(match timed_out.replace(None) {
857 Some(msg) => {
858 let code = curl_sys::CURLE_OPERATION_TIMEDOUT;
859 let mut err = curl::Error::new(code);
860 err.set_extra(msg);
861 err
862 }
863 None => e,
864 }
865 .into());
866 }
867
868 let code = handle.response_code()?;
869 if code != 200 && code != 0 {
870 return Err(HttpNotSuccessful::new_from_handle(
871 &mut handle,
872 &url,
873 data,
874 headers,
875 )
876 .into());
877 }
878 Ok(data)
879 })
880 };
881 match ret {
882 RetryResult::Success(data) => break (dl, data),
883 RetryResult::Err(e) => {
884 return Err(e.context(format!("failed to download from `{}`", dl.url)))
885 }
886 RetryResult::Retry(sleep) => {
887 debug!(target: "network", "download retry {} for {sleep}ms", dl.url);
888 self.sleeping.push(sleep, (dl, handle));
889 }
890 }
891 };
892
893 self.progress.borrow_mut().as_mut().unwrap().clear();
897 self.set.gctx.shell().status("Downloaded", &dl.descriptor)?;
898
899 self.downloads_finished += 1;
900 self.downloaded_bytes += dl.total.get();
901 if dl.total.get() > self.largest.0 {
902 self.largest = (dl.total.get(), dl.id.name());
903 }
904
905 let kib_400 = 1024 * 400;
911 if dl.total.get() < kib_400 {
912 self.tick(WhyTick::DownloadFinished)?;
913 } else {
914 self.tick(WhyTick::Extracting(&dl.id.name()))?;
915 }
916
917 let mut sources = self.set.sources.borrow_mut();
920 let source = sources
921 .get_mut(dl.id.source_id())
922 .ok_or_else(|| internal(format!("couldn't find source for `{}`", dl.id)))?;
923 let start = Instant::now();
924 let pkg = source.finish_download(dl.id, data)?;
925
926 let finish_dur = start.elapsed();
931 self.updated_at.set(self.updated_at.get() + finish_dur);
932 self.next_speed_check
933 .set(self.next_speed_check.get() + finish_dur);
934
935 let slot = &self.set.packages[&dl.id];
936 assert!(slot.fill(pkg).is_ok());
937 Ok(slot.borrow().unwrap())
938 }
939
940 fn enqueue(&mut self, dl: Download<'gctx>, handle: Easy) -> CargoResult<()> {
941 let mut handle = self.set.multi.add(handle)?;
942 let now = Instant::now();
943 handle.set_token(dl.token)?;
944 self.updated_at.set(now);
945 self.next_speed_check.set(now + self.timeout.dur);
946 self.next_speed_check_bytes_threshold
947 .set(u64::from(self.timeout.low_speed_limit));
948 dl.timed_out.set(None);
949 dl.current.set(0);
950 dl.total.set(0);
951 self.pending.insert(dl.token, (dl, handle));
952 Ok(())
953 }
954
955 fn wait_for_curl(&mut self) -> CargoResult<(usize, Result<(), curl::Error>)> {
958 loop {
974 self.add_sleepers()?;
975 let n = tls::set(self, || {
976 self.set
977 .multi
978 .perform()
979 .context("failed to perform http requests")
980 })?;
981 debug!(target: "network", "handles remaining: {}", n);
982 let results = &mut self.results;
983 let pending = &self.pending;
984 self.set.multi.messages(|msg| {
985 let token = msg.token().expect("failed to read token");
986 let handle = &pending[&token].1;
987 if let Some(result) = msg.result_for(handle) {
988 results.push((token, result));
989 } else {
990 debug!(target: "network", "message without a result (?)");
991 }
992 });
993
994 if let Some(pair) = results.pop() {
995 break Ok(pair);
996 }
997 assert_ne!(self.remaining(), 0);
998 if self.pending.is_empty() {
999 let delay = self.sleeping.time_to_next().unwrap();
1000 debug!(target: "network", "sleeping main thread for {delay:?}");
1001 std::thread::sleep(delay);
1002 } else {
1003 let min_timeout = Duration::new(1, 0);
1004 let timeout = self.set.multi.get_timeout()?.unwrap_or(min_timeout);
1005 let timeout = timeout.min(min_timeout);
1006 self.set
1007 .multi
1008 .wait(&mut [], timeout)
1009 .context("failed to wait on curl `Multi`")?;
1010 }
1011 }
1012 }
1013
1014 fn add_sleepers(&mut self) -> CargoResult<()> {
1015 for (dl, handle) in self.sleeping.to_retry() {
1016 self.pending_ids.insert(dl.id);
1017 self.enqueue(dl, handle)?;
1018 }
1019 Ok(())
1020 }
1021
1022 fn progress(&self, token: usize, total: u64, cur: u64) -> bool {
1023 let dl = &self.pending[&token].0;
1024 dl.total.set(total);
1025 let now = Instant::now();
1026 if cur > dl.current.get() {
1027 let delta = cur - dl.current.get();
1028 let threshold = self.next_speed_check_bytes_threshold.get();
1029
1030 dl.current.set(cur);
1031 self.updated_at.set(now);
1032
1033 if delta >= threshold {
1034 self.next_speed_check.set(now + self.timeout.dur);
1035 self.next_speed_check_bytes_threshold
1036 .set(u64::from(self.timeout.low_speed_limit));
1037 } else {
1038 self.next_speed_check_bytes_threshold.set(threshold - delta);
1039 }
1040 }
1041 if self.tick(WhyTick::DownloadUpdate).is_err() {
1042 return false;
1043 }
1044
1045 if now > self.updated_at.get() + self.timeout.dur {
1047 self.updated_at.set(now);
1048 let msg = format!(
1049 "failed to download any data for `{}` within {}s",
1050 dl.id,
1051 self.timeout.dur.as_secs()
1052 );
1053 dl.timed_out.set(Some(msg));
1054 return false;
1055 }
1056
1057 if now >= self.next_speed_check.get() {
1062 self.next_speed_check.set(now + self.timeout.dur);
1063 assert!(self.next_speed_check_bytes_threshold.get() > 0);
1064 let msg = format!(
1065 "download of `{}` failed to transfer more \
1066 than {} bytes in {}s",
1067 dl.id,
1068 self.timeout.low_speed_limit,
1069 self.timeout.dur.as_secs()
1070 );
1071 dl.timed_out.set(Some(msg));
1072 return false;
1073 }
1074
1075 true
1076 }
1077
1078 fn tick(&self, why: WhyTick<'_>) -> CargoResult<()> {
1079 let mut progress = self.progress.borrow_mut();
1080 let progress = progress.as_mut().unwrap();
1081
1082 if let WhyTick::DownloadUpdate = why {
1083 if !progress.update_allowed() {
1084 return Ok(());
1085 }
1086 }
1087 let pending = self.remaining();
1088 let mut msg = if pending == 1 {
1089 format!("{} crate", pending)
1090 } else {
1091 format!("{} crates", pending)
1092 };
1093 match why {
1094 WhyTick::Extracting(krate) => {
1095 msg.push_str(&format!(", extracting {} ...", krate));
1096 }
1097 _ => {
1098 let mut dur = Duration::new(0, 0);
1099 let mut remaining = 0;
1100 for (dl, _) in self.pending.values() {
1101 dur += dl.start.elapsed();
1102 if dl.total.get() >= dl.current.get() {
1106 remaining += dl.total.get() - dl.current.get();
1107 }
1108 }
1109 if remaining > 0 && dur > Duration::from_millis(500) {
1110 msg.push_str(&format!(", remaining bytes: {:.1}", HumanBytes(remaining)));
1111 }
1112 }
1113 }
1114 progress.print_now(&msg)
1115 }
1116}
1117
1118#[derive(Copy, Clone)]
1119enum WhyTick<'a> {
1120 DownloadStarted,
1121 DownloadUpdate,
1122 DownloadFinished,
1123 Extracting(&'a str),
1124}
1125
1126impl<'a, 'gctx> Drop for Downloads<'a, 'gctx> {
1127 fn drop(&mut self) {
1128 self.set.downloading.set(false);
1129 let progress = self.progress.get_mut().take().unwrap();
1130 if !progress.is_enabled() {
1133 return;
1134 }
1135 if self.downloads_finished == 0 {
1137 return;
1138 }
1139 if !self.success {
1141 return;
1142 }
1143 let crate_string = if self.downloads_finished == 1 {
1145 "crate"
1146 } else {
1147 "crates"
1148 };
1149 let mut status = format!(
1150 "{} {} ({:.1}) in {}",
1151 self.downloads_finished,
1152 crate_string,
1153 HumanBytes(self.downloaded_bytes),
1154 util::elapsed(self.start.elapsed())
1155 );
1156 let mib_1 = 1024 * 1024;
1160 if self.largest.0 > mib_1 && self.downloads_finished > 1 {
1161 status.push_str(&format!(
1162 " (largest was `{}` at {:.1})",
1163 self.largest.1,
1164 HumanBytes(self.largest.0),
1165 ));
1166 }
1167 drop(progress);
1169 drop(self.set.gctx.shell().status("Downloaded", status));
1170 }
1171}
1172
1173mod tls {
1174 use std::cell::Cell;
1175
1176 use super::Downloads;
1177
1178 thread_local!(static PTR: Cell<usize> = const { Cell::new(0) });
1179
1180 pub(crate) fn with<R>(f: impl FnOnce(Option<&Downloads<'_, '_>>) -> R) -> R {
1181 let ptr = PTR.with(|p| p.get());
1182 if ptr == 0 {
1183 f(None)
1184 } else {
1185 unsafe { f(Some(&*(ptr as *const Downloads<'_, '_>))) }
1186 }
1187 }
1188
1189 pub(crate) fn set<R>(dl: &Downloads<'_, '_>, f: impl FnOnce() -> R) -> R {
1190 struct Reset<'a, T: Copy>(&'a Cell<T>, T);
1191
1192 impl<'a, T: Copy> Drop for Reset<'a, T> {
1193 fn drop(&mut self) {
1194 self.0.set(self.1);
1195 }
1196 }
1197
1198 PTR.with(|p| {
1199 let _reset = Reset(p, p.get());
1200 p.set(dl as *const Downloads<'_, '_> as usize);
1201 f()
1202 })
1203 }
1204}