diff --git a/Cargo.lock b/Cargo.lock index d3e0ad32..0ba0273d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1332,6 +1332,7 @@ dependencies = [ "pacquet-executor", "pacquet-fs", "pacquet-lockfile", + "pacquet-network", "pacquet-npmrc", "pacquet-package-manager", "pacquet-package-manifest", @@ -1341,7 +1342,6 @@ dependencies = [ "pacquet-testing-utils", "pipe-trait", "pretty_assertions", - "reqwest", "serde_json", "tempfile", "tokio", @@ -1414,17 +1414,27 @@ dependencies = [ "criterion", "mockito", "node-semver", + "pacquet-network", "pacquet-registry", "pacquet-store-dir", "pacquet-tarball", "pipe-trait", "project-root", - "reqwest", "ssri", "tempfile", "tokio", ] +[[package]] +name = "pacquet-network" +version = "0.0.1" +dependencies = [ + "num_cpus", + "pipe-trait", + "reqwest", + "tokio", +] + [[package]] name = "pacquet-npmrc" version = "0.0.1" @@ -1450,6 +1460,7 @@ dependencies = [ "node-semver", "pacquet-fs", "pacquet-lockfile", + "pacquet-network", "pacquet-npmrc", "pacquet-package-manifest", "pacquet-registry", @@ -1461,7 +1472,6 @@ dependencies = [ "pretty_assertions", "rayon", "reflink-copy", - "reqwest", "tempfile", "tokio", "tracing", @@ -1491,6 +1501,7 @@ dependencies = [ "miette", "node-semver", "pacquet-diagnostics", + "pacquet-network", "pipe-trait", "pretty_assertions", "reqwest", @@ -1543,6 +1554,7 @@ dependencies = [ "miette", "pacquet-diagnostics", "pacquet-fs", + "pacquet-network", "pacquet-store-dir", "pipe-trait", "pretty_assertions", diff --git a/Cargo.toml b/Cargo.toml index 18998766..3bf34f55 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ pacquet-testing-utils = { path = "crates/testing-utils" } pacquet-package-manifest = { path = "crates/package-manifest" } pacquet-package-manager = { path = "crates/package-manager" } pacquet-lockfile = { path = "crates/lockfile" } +pacquet-network = { path = "crates/network" } pacquet-npmrc = { path = "crates/npmrc" } pacquet-executor = { path = "crates/executor" } pacquet-diagnostics = { path = "crates/diagnostics" } @@ -43,6 +44,7 @@ insta = { version = "1.34.0", features = ["yaml", "glob", "walkdir" itertools = { version = "0.11.0" } futures-util = { version = "0.3.29" } miette = { version = "5.9.0", features = ["fancy"] } +num_cpus = { version = "1.16.0" } os_display = { version = "0.1.3" } reflink-copy = { version = "0.1.9" } junction = { version = "1.0.0" } diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index a5daff4e..696751cf 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -18,6 +18,7 @@ path = "src/bin/main.rs" pacquet-executor = { workspace = true } pacquet-fs = { workspace = true } pacquet-lockfile = { workspace = true } +pacquet-network = { workspace = true } pacquet-npmrc = { workspace = true } pacquet-package-manifest = { workspace = true } pacquet-package-manager = { workspace = true } @@ -29,7 +30,6 @@ clap = { workspace = true } derive_more = { workspace = true } home = { workspace = true } miette = { workspace = true } -reqwest = { workspace = true } pipe-trait = { workspace = true } tokio = { workspace = true } diff --git a/crates/cli/src/state.rs b/crates/cli/src/state.rs index 19f64245..84bdf8d1 100644 --- a/crates/cli/src/state.rs +++ b/crates/cli/src/state.rs @@ -1,11 +1,11 @@ use derive_more::{Display, Error}; use miette::Diagnostic; use pacquet_lockfile::{LoadLockfileError, Lockfile}; +use pacquet_network::ThrottledClient; use pacquet_npmrc::Npmrc; use pacquet_package_manifest::{PackageManifest, PackageManifestError}; use pacquet_tarball::MemCache; use pipe_trait::Pipe; -use reqwest::Client; use std::path::PathBuf; /// Application state when running `pacquet run` or `pacquet install`. @@ -13,7 +13,7 @@ pub struct State { /// Shared cache that store downloaded tarballs. pub tarball_mem_cache: MemCache, /// HTTP client to make HTTP requests. - pub http_client: Client, + pub http_client: ThrottledClient, /// Configuration read from `.npmrc` pub config: &'static Npmrc, /// Data from the `package.json` file. @@ -43,7 +43,7 @@ impl State { .map_err(InitStateError::LoadManifest)?, lockfile: call_load_lockfile(config.lockfile, Lockfile::load_from_current_dir) .map_err(InitStateError::LoadLockfile)?, - http_client: Client::new(), + http_client: ThrottledClient::new_from_cpu_count(), tarball_mem_cache: MemCache::new(), }) } diff --git a/crates/network/Cargo.toml b/crates/network/Cargo.toml new file mode 100644 index 00000000..e17377c8 --- /dev/null +++ b/crates/network/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "pacquet-network" +description = "Network utility functions used by pacquet" +version = "0.0.1" +publish = false +authors.workspace = true +edition.workspace = true +homepage.workspace = true +keywords.workspace = true +license.workspace = true +repository.workspace = true + +[dependencies] +num_cpus = { workspace = true } +pipe-trait = { workspace = true } +reqwest = { workspace = true } +tokio = { workspace = true } diff --git a/crates/network/src/lib.rs b/crates/network/src/lib.rs new file mode 100644 index 00000000..3152834d --- /dev/null +++ b/crates/network/src/lib.rs @@ -0,0 +1,43 @@ +use pipe_trait::Pipe; +use reqwest::Client; +use std::future::IntoFuture; +use tokio::sync::Semaphore; + +/// Wrapper around [`Client`] with concurrent request limit enforced by the [`Semaphore`] mechanism. +#[derive(Debug)] +pub struct ThrottledClient { + semaphore: Semaphore, + client: Client, +} + +impl ThrottledClient { + /// Acquire a permit and run `proc` with the underlying [`Client`]. + pub async fn run_with_permit(&self, proc: Proc) -> ProcFuture::Output + where + Proc: FnOnce(&Client) -> ProcFuture, + ProcFuture: IntoFuture, + { + let permit = + self.semaphore.acquire().await.expect("semaphore shouldn't have been closed this soon"); + let result = proc(&self.client).await; + drop(permit); + result + } + + /// Construct a new throttled client based on the number of CPUs. + /// If the number of CPUs is greater than 16, the number of permits will be equal to the number of CPUs. + /// Otherwise, the number of permits will be 16. + pub fn new_from_cpu_count() -> Self { + const MIN_PERMITS: usize = 16; + let semaphore = num_cpus::get().max(MIN_PERMITS).pipe(Semaphore::new); + let client = Client::new(); + ThrottledClient { semaphore, client } + } +} + +/// This is only necessary for tests. +impl Default for ThrottledClient { + fn default() -> Self { + ThrottledClient::new_from_cpu_count() + } +} diff --git a/crates/package-manager/Cargo.toml b/crates/package-manager/Cargo.toml index bcda8a58..027f4e01 100644 --- a/crates/package-manager/Cargo.toml +++ b/crates/package-manager/Cargo.toml @@ -13,6 +13,7 @@ repository.workspace = true [dependencies] pacquet-fs = { workspace = true } pacquet-lockfile = { workspace = true } +pacquet-network = { workspace = true } pacquet-npmrc = { workspace = true } pacquet-package-manifest = { workspace = true } pacquet-registry = { workspace = true } @@ -25,7 +26,6 @@ node-semver = { workspace = true } pipe-trait = { workspace = true } rayon = { workspace = true } reflink-copy = { workspace = true } -reqwest = { workspace = true } tracing = { workspace = true } miette = { workspace = true } diff --git a/crates/package-manager/src/add.rs b/crates/package-manager/src/add.rs index e39294da..e4076e55 100644 --- a/crates/package-manager/src/add.rs +++ b/crates/package-manager/src/add.rs @@ -2,12 +2,12 @@ use crate::Install; use derive_more::{Display, Error}; use miette::Diagnostic; use pacquet_lockfile::Lockfile; +use pacquet_network::ThrottledClient; use pacquet_npmrc::Npmrc; use pacquet_package_manifest::PackageManifestError; use pacquet_package_manifest::{DependencyGroup, PackageManifest}; use pacquet_registry::{PackageTag, PackageVersion}; use pacquet_tarball::MemCache; -use reqwest::Client; /// This subroutine does everything `pacquet add` is supposed to do. #[must_use] @@ -17,7 +17,7 @@ where DependencyGroupList: IntoIterator, { pub tarball_mem_cache: &'a MemCache, - pub http_client: &'a Client, + pub http_client: &'a ThrottledClient, pub config: &'static Npmrc, pub manifest: &'a mut PackageManifest, pub lockfile: Option<&'a Lockfile>, diff --git a/crates/package-manager/src/create_virtual_store.rs b/crates/package-manager/src/create_virtual_store.rs index fed9a60c..3fbe9ba8 100644 --- a/crates/package-manager/src/create_virtual_store.rs +++ b/crates/package-manager/src/create_virtual_store.rs @@ -1,15 +1,15 @@ use crate::InstallPackageBySnapshot; use futures_util::future; use pacquet_lockfile::{DependencyPath, PackageSnapshot, RootProjectSnapshot}; +use pacquet_network::ThrottledClient; use pacquet_npmrc::Npmrc; use pipe_trait::Pipe; -use reqwest::Client; use std::collections::HashMap; /// This subroutine generates filesystem layout for the virtual store at `node_modules/.pacquet`. #[must_use] pub struct CreateVirtualStore<'a> { - pub http_client: &'a Client, + pub http_client: &'a ThrottledClient, pub config: &'static Npmrc, pub packages: Option<&'a HashMap>, pub project_snapshot: &'a RootProjectSnapshot, diff --git a/crates/package-manager/src/install.rs b/crates/package-manager/src/install.rs index 04655657..2a59b475 100644 --- a/crates/package-manager/src/install.rs +++ b/crates/package-manager/src/install.rs @@ -1,9 +1,9 @@ use crate::{InstallFrozenLockfile, InstallWithoutLockfile}; use pacquet_lockfile::Lockfile; +use pacquet_network::ThrottledClient; use pacquet_npmrc::Npmrc; use pacquet_package_manifest::{DependencyGroup, PackageManifest}; use pacquet_tarball::MemCache; -use reqwest::Client; /// This subroutine does everything `pacquet install` is supposed to do. #[must_use] @@ -12,7 +12,7 @@ where DependencyGroupList: IntoIterator, { pub tarball_mem_cache: &'a MemCache, - pub http_client: &'a Client, + pub http_client: &'a ThrottledClient, pub config: &'static Npmrc, pub manifest: &'a PackageManifest, pub lockfile: Option<&'a Lockfile>, diff --git a/crates/package-manager/src/install_frozen_lockfile.rs b/crates/package-manager/src/install_frozen_lockfile.rs index 124615a2..ba746373 100644 --- a/crates/package-manager/src/install_frozen_lockfile.rs +++ b/crates/package-manager/src/install_frozen_lockfile.rs @@ -1,8 +1,8 @@ use crate::{CreateVirtualStore, SymlinkDirectDependencies}; use pacquet_lockfile::{DependencyPath, PackageSnapshot, RootProjectSnapshot}; +use pacquet_network::ThrottledClient; use pacquet_npmrc::Npmrc; use pacquet_package_manifest::DependencyGroup; -use reqwest::Client; use std::collections::HashMap; /// This subroutine installs dependencies from a frozen lockfile. @@ -19,7 +19,7 @@ pub struct InstallFrozenLockfile<'a, DependencyGroupList> where DependencyGroupList: IntoIterator, { - pub http_client: &'a Client, + pub http_client: &'a ThrottledClient, pub config: &'static Npmrc, pub project_snapshot: &'a RootProjectSnapshot, pub packages: Option<&'a HashMap>, diff --git a/crates/package-manager/src/install_package_by_snapshot.rs b/crates/package-manager/src/install_package_by_snapshot.rs index 0adb755a..66106d10 100644 --- a/crates/package-manager/src/install_package_by_snapshot.rs +++ b/crates/package-manager/src/install_package_by_snapshot.rs @@ -2,17 +2,17 @@ use crate::{CreateVirtualDirBySnapshot, CreateVirtualDirError}; use derive_more::{Display, Error}; use miette::Diagnostic; use pacquet_lockfile::{DependencyPath, LockfileResolution, PackageSnapshot, PkgNameVerPeer}; +use pacquet_network::ThrottledClient; use pacquet_npmrc::Npmrc; use pacquet_tarball::{DownloadTarballToStore, TarballError}; use pipe_trait::Pipe; -use reqwest::Client; use std::borrow::Cow; /// This subroutine downloads a package tarball, extracts it, installs it to a virtual dir, /// then creates the symlink layout for the package. #[must_use] pub struct InstallPackageBySnapshot<'a> { - pub http_client: &'a Client, + pub http_client: &'a ThrottledClient, pub config: &'static Npmrc, pub dependency_path: &'a DependencyPath, pub package_snapshot: &'a PackageSnapshot, diff --git a/crates/package-manager/src/install_package_from_registry.rs b/crates/package-manager/src/install_package_from_registry.rs index b58d62b8..47ead213 100644 --- a/crates/package-manager/src/install_package_from_registry.rs +++ b/crates/package-manager/src/install_package_from_registry.rs @@ -1,10 +1,10 @@ use crate::{create_cas_files, symlink_package, CreateCasFilesError, SymlinkPackageError}; use derive_more::{Display, Error}; use miette::Diagnostic; +use pacquet_network::ThrottledClient; use pacquet_npmrc::Npmrc; use pacquet_registry::{Package, PackageTag, PackageVersion, RegistryError}; use pacquet_tarball::{DownloadTarballToStore, MemCache, TarballError}; -use reqwest::Client; use std::{path::Path, str::FromStr}; /// This subroutine executes the following and returns the package @@ -18,7 +18,7 @@ use std::{path::Path, str::FromStr}; #[must_use] pub struct InstallPackageFromRegistry<'a> { pub tarball_mem_cache: &'a MemCache, - pub http_client: &'a Client, + pub http_client: &'a ThrottledClient, pub config: &'static Npmrc, pub node_modules_dir: &'a Path, pub name: &'a str, @@ -158,7 +158,7 @@ mod tests { create_config(store_dir.path(), modules_dir.path(), virtual_store_dir.path()) .pipe(Box::new) .pipe(Box::leak); - let http_client = reqwest::Client::new(); + let http_client = ThrottledClient::new_from_cpu_count(); let package = InstallPackageFromRegistry { tarball_mem_cache: &Default::default(), config, diff --git a/crates/package-manager/src/install_without_lockfile.rs b/crates/package-manager/src/install_without_lockfile.rs index c4bddb9f..48f1903d 100644 --- a/crates/package-manager/src/install_without_lockfile.rs +++ b/crates/package-manager/src/install_without_lockfile.rs @@ -2,12 +2,12 @@ use crate::InstallPackageFromRegistry; use async_recursion::async_recursion; use futures_util::future; use node_semver::Version; +use pacquet_network::ThrottledClient; use pacquet_npmrc::Npmrc; use pacquet_package_manifest::{DependencyGroup, PackageManifest}; use pacquet_registry::PackageVersion; use pacquet_tarball::MemCache; use pipe_trait::Pipe; -use reqwest::Client; /// This subroutine install packages from a `package.json` without reading or writing a lockfile. /// @@ -21,7 +21,7 @@ use reqwest::Client; #[must_use] pub struct InstallWithoutLockfile<'a, DependencyGroupList> { pub tarball_mem_cache: &'a MemCache, - pub http_client: &'a Client, + pub http_client: &'a ThrottledClient, pub config: &'static Npmrc, pub manifest: &'a PackageManifest, pub dependency_groups: DependencyGroupList, diff --git a/crates/registry/Cargo.toml b/crates/registry/Cargo.toml index 5982971a..cbceb431 100644 --- a/crates/registry/Cargo.toml +++ b/crates/registry/Cargo.toml @@ -12,6 +12,7 @@ repository.workspace = true [dependencies] pacquet-diagnostics = { workspace = true } +pacquet-network = { workspace = true } derive_more = { workspace = true } reqwest = { workspace = true } diff --git a/crates/registry/src/package.rs b/crates/registry/src/package.rs index 8779f1c8..3bd4f087 100644 --- a/crates/registry/src/package.rs +++ b/crates/registry/src/package.rs @@ -3,6 +3,7 @@ use std::{ sync::{Arc, Mutex}, }; +use pacquet_network::ThrottledClient; use pipe_trait::Pipe; use serde::{Deserialize, Serialize}; @@ -28,15 +29,15 @@ impl PartialEq for Package { impl Package { pub async fn fetch_from_registry( name: &str, - http_client: &reqwest::Client, + http_client: &ThrottledClient, registry: &str, ) -> Result { let url = || format!("{registry}{name}"); // TODO: use reqwest URL directly let network_error = |error| NetworkError { error, url: url() }; http_client - .get(url()) - .header("content-type", "application/json") - .send() + .run_with_permit(|client| { + client.get(url()).header("content-type", "application/json").send() + }) .await .map_err(network_error)? .json::() diff --git a/crates/registry/src/package_version.rs b/crates/registry/src/package_version.rs index dcaca921..009b75d8 100644 --- a/crates/registry/src/package_version.rs +++ b/crates/registry/src/package_version.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; +use pacquet_network::ThrottledClient; use pipe_trait::Pipe; use serde::{Deserialize, Serialize}; @@ -26,16 +27,16 @@ impl PackageVersion { pub async fn fetch_from_registry( name: &str, tag: PackageTag, - http_client: &reqwest::Client, + http_client: &ThrottledClient, registry: &str, ) -> Result { let url = || format!("{registry}{name}/{tag}"); let network_error = |error| NetworkError { error, url: url() }; http_client - .get(url()) - .header("content-type", "application/json") - .send() + .run_with_permit(|client| { + client.get(url()).header("content-type", "application/json").send() + }) .await .map_err(network_error)? .json::() diff --git a/crates/tarball/Cargo.toml b/crates/tarball/Cargo.toml index 41e675ce..f3c2fa5d 100644 --- a/crates/tarball/Cargo.toml +++ b/crates/tarball/Cargo.toml @@ -13,6 +13,7 @@ repository.workspace = true [dependencies] pacquet-diagnostics = { workspace = true } pacquet-fs = { workspace = true } +pacquet-network = { workspace = true } pacquet-store-dir = { workspace = true } base64 = { workspace = true } diff --git a/crates/tarball/src/lib.rs b/crates/tarball/src/lib.rs index c182ff4a..c30198bb 100644 --- a/crates/tarball/src/lib.rs +++ b/crates/tarball/src/lib.rs @@ -12,11 +12,11 @@ use dashmap::DashMap; use derive_more::{Display, Error, From}; use miette::Diagnostic; use pacquet_fs::file_mode; +use pacquet_network::ThrottledClient; use pacquet_store_dir::{ PackageFileInfo, PackageFilesIndex, StoreDir, WriteCasFileError, WriteIndexFileError, }; use pipe_trait::Pipe; -use reqwest::Client; use ssri::{Integrity, IntegrityChecker}; use tar::Archive; use tokio::sync::{Notify, RwLock}; @@ -113,7 +113,7 @@ fn verify_checksum(data: &[u8], integrity: Integrity) -> Result { - pub http_client: &'a Client, + pub http_client: &'a ThrottledClient, pub store_dir: &'static StoreDir, pub package_integrity: &'a Integrity, pub package_unpacked_size: Option, @@ -180,8 +180,7 @@ impl<'a> DownloadTarballToStore<'a> { TarballError::FetchTarball(NetworkError { url: package_url.to_string(), error }) }; let response = http_client - .get(package_url) - .send() + .run_with_permit(|client| client.get(package_url).send()) .await .map_err(network_error)? .bytes() diff --git a/tasks/micro-benchmark/Cargo.toml b/tasks/micro-benchmark/Cargo.toml index 84063691..3f2734e2 100644 --- a/tasks/micro-benchmark/Cargo.toml +++ b/tasks/micro-benchmark/Cargo.toml @@ -16,6 +16,7 @@ path = "src/main.rs" [dependencies] pacquet-registry = { workspace = true } +pacquet-network = { workspace = true } pacquet-store-dir = { workspace = true } pacquet-tarball = { workspace = true } @@ -26,6 +27,5 @@ tokio = { workspace = true } tempfile = { workspace = true } pipe-trait = { workspace = true } project-root = { workspace = true } -reqwest = { workspace = true } ssri = { workspace = true } node-semver = { workspace = true } diff --git a/tasks/micro-benchmark/src/main.rs b/tasks/micro-benchmark/src/main.rs index b012b01c..ef866c46 100644 --- a/tasks/micro-benchmark/src/main.rs +++ b/tasks/micro-benchmark/src/main.rs @@ -3,11 +3,11 @@ use std::{fs, path::Path}; use clap::Parser; use criterion::{Criterion, Throughput}; use mockito::ServerGuard; +use pacquet_network::ThrottledClient; use pacquet_store_dir::StoreDir; use pacquet_tarball::DownloadTarballToStore; use pipe_trait::Pipe; use project_root::get_project_root; -use reqwest::Client; use ssri::Integrity; use tempfile::tempdir; @@ -34,7 +34,7 @@ fn bench_tarball(c: &mut Criterion, server: &mut ServerGuard, fixtures_folder: & let dir = tempdir().unwrap(); let store_dir = dir.path().to_path_buf().pipe(StoreDir::from).pipe(Box::new).pipe(Box::leak); - let http_client = Client::new(); + let http_client = ThrottledClient::new_from_cpu_count(); let cas_map = DownloadTarballToStore { http_client: &http_client,