Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(network): limit concurrent network requests #210

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pacquet-testing-utils = { path = "crates/testing-utils" }
pacquet-package-manifest = { path = "crates/package-manifest" }
pacquet-package-manager = { path = "crates/package-manager" }
pacquet-lockfile = { path = "crates/lockfile" }
pacquet-network = { path = "crates/network" }
pacquet-npmrc = { path = "crates/npmrc" }
pacquet-executor = { path = "crates/executor" }
pacquet-diagnostics = { path = "crates/diagnostics" }
Expand All @@ -43,6 +44,7 @@ insta = { version = "1.34.0", features = ["yaml", "glob", "walkdir"
itertools = { version = "0.11.0" }
futures-util = { version = "0.3.29" }
miette = { version = "5.9.0", features = ["fancy"] }
num_cpus = { version = "1.16.0" }
os_display = { version = "0.1.3" }
reflink-copy = { version = "0.1.9" }
junction = { version = "1.0.0" }
Expand Down
1 change: 1 addition & 0 deletions crates/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ path = "src/bin/main.rs"
pacquet-executor = { workspace = true }
pacquet-fs = { workspace = true }
pacquet-lockfile = { workspace = true }
pacquet-network = { workspace = true }
pacquet-npmrc = { workspace = true }
pacquet-package-manifest = { workspace = true }
pacquet-package-manager = { workspace = true }
Expand Down
6 changes: 3 additions & 3 deletions crates/cli/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
use derive_more::{Display, Error};
use miette::Diagnostic;
use pacquet_lockfile::{LoadLockfileError, Lockfile};
use pacquet_network::ThrottledClient;
use pacquet_npmrc::Npmrc;
use pacquet_package_manifest::{PackageManifest, PackageManifestError};
use pacquet_tarball::MemCache;
use pipe_trait::Pipe;
use reqwest::Client;
use std::path::PathBuf;

/// Application state when running `pacquet run` or `pacquet install`.
pub struct State {
/// Shared cache that store downloaded tarballs.
pub tarball_mem_cache: MemCache,
/// HTTP client to make HTTP requests.
pub http_client: Client,
pub http_client: ThrottledClient,
/// Configuration read from `.npmrc`
pub config: &'static Npmrc,
/// Data from the `package.json` file.
Expand Down Expand Up @@ -43,7 +43,7 @@ impl State {
.map_err(InitStateError::LoadManifest)?,
lockfile: call_load_lockfile(config.lockfile, Lockfile::load_from_current_dir)
.map_err(InitStateError::LoadLockfile)?,
http_client: Client::new(),
http_client: ThrottledClient::new_from_cpu_count(),
tarball_mem_cache: MemCache::new(),
})
}
Expand Down
17 changes: 17 additions & 0 deletions crates/network/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[package]
name = "pacquet-network"
description = "Network utility functions used by pacquet"
version = "0.0.1"
publish = false
authors.workspace = true
edition.workspace = true
homepage.workspace = true
keywords.workspace = true
license.workspace = true
repository.workspace = true

[dependencies]
num_cpus = { workspace = true }
pipe-trait = { workspace = true }
reqwest = { workspace = true }
tokio = { workspace = true }
43 changes: 43 additions & 0 deletions crates/network/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use pipe_trait::Pipe;
use reqwest::Client;
use std::future::IntoFuture;
use tokio::sync::Semaphore;

/// Wrapper around [`Client`] with concurrent request limit enforced by the [`Semaphore`] mechanism.
#[derive(Debug)]

Check warning on line 7 in crates/network/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/network/src/lib.rs#L7

Added line #L7 was not covered by tests
pub struct ThrottledClient {
semaphore: Semaphore,
client: Client,
}

impl ThrottledClient {
/// Acquire a permit and run `proc` with the underlying [`Client`].
pub async fn run_with_permit<Proc, ProcFuture>(&self, proc: Proc) -> ProcFuture::Output
where
Proc: FnOnce(&Client) -> ProcFuture,
ProcFuture: IntoFuture,
{
let permit =
self.semaphore.acquire().await.expect("semaphore shouldn't have been closed this soon");
let result = proc(&self.client).await;
drop(permit);
result
}

/// Construct a new throttled client based on the number of CPUs.
/// If the number of CPUs is greater than 16, the number of permits will be equal to the number of CPUs.
/// Otherwise, the number of permits will be 16.
pub fn new_from_cpu_count() -> Self {
const MIN_PERMITS: usize = 16;
let semaphore = num_cpus::get().max(MIN_PERMITS).pipe(Semaphore::new);
let client = Client::new();
ThrottledClient { semaphore, client }
}
}

/// This is only necessary for tests.
impl Default for ThrottledClient {
fn default() -> Self {
ThrottledClient::new_from_cpu_count()
}
}
1 change: 1 addition & 0 deletions crates/package-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ repository.workspace = true
[dependencies]
pacquet-fs = { workspace = true }
pacquet-lockfile = { workspace = true }
pacquet-network = { workspace = true }
pacquet-npmrc = { workspace = true }
pacquet-package-manifest = { workspace = true }
pacquet-registry = { workspace = true }
Expand Down
4 changes: 2 additions & 2 deletions crates/package-manager/src/add.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ use crate::Install;
use derive_more::{Display, Error};
use miette::Diagnostic;
use pacquet_lockfile::Lockfile;
use pacquet_network::ThrottledClient;
use pacquet_npmrc::Npmrc;
use pacquet_package_manifest::PackageManifestError;
use pacquet_package_manifest::{DependencyGroup, PackageManifest};
use pacquet_registry::{PackageTag, PackageVersion};
use pacquet_tarball::MemCache;
use reqwest::Client;

/// This subroutine does everything `pacquet add` is supposed to do.
#[must_use]
Expand All @@ -17,7 +17,7 @@ where
DependencyGroupList: IntoIterator<Item = DependencyGroup>,
{
pub tarball_mem_cache: &'a MemCache,
pub http_client: &'a Client,
pub http_client: &'a ThrottledClient,
pub config: &'static Npmrc,
pub manifest: &'a mut PackageManifest,
pub lockfile: Option<&'a Lockfile>,
Expand Down
4 changes: 2 additions & 2 deletions crates/package-manager/src/create_virtual_store.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use crate::InstallPackageBySnapshot;
use futures_util::future;
use pacquet_lockfile::{DependencyPath, PackageSnapshot, RootProjectSnapshot};
use pacquet_network::ThrottledClient;
use pacquet_npmrc::Npmrc;
use pipe_trait::Pipe;
use reqwest::Client;
use std::collections::HashMap;

/// This subroutine generates filesystem layout for the virtual store at `node_modules/.pacquet`.
#[must_use]
pub struct CreateVirtualStore<'a> {
pub http_client: &'a Client,
pub http_client: &'a ThrottledClient,
pub config: &'static Npmrc,
pub packages: Option<&'a HashMap<DependencyPath, PackageSnapshot>>,
pub project_snapshot: &'a RootProjectSnapshot,
Expand Down
4 changes: 2 additions & 2 deletions crates/package-manager/src/install.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::{InstallFrozenLockfile, InstallWithoutLockfile};
use pacquet_lockfile::Lockfile;
use pacquet_network::ThrottledClient;
use pacquet_npmrc::Npmrc;
use pacquet_package_manifest::{DependencyGroup, PackageManifest};
use pacquet_tarball::MemCache;
use reqwest::Client;

/// This subroutine does everything `pacquet install` is supposed to do.
#[must_use]
Expand All @@ -12,7 +12,7 @@ where
DependencyGroupList: IntoIterator<Item = DependencyGroup>,
{
pub tarball_mem_cache: &'a MemCache,
pub http_client: &'a Client,
pub http_client: &'a ThrottledClient,
pub config: &'static Npmrc,
pub manifest: &'a PackageManifest,
pub lockfile: Option<&'a Lockfile>,
Expand Down
4 changes: 2 additions & 2 deletions crates/package-manager/src/install_frozen_lockfile.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::{CreateVirtualStore, SymlinkDirectDependencies};
use pacquet_lockfile::{DependencyPath, PackageSnapshot, RootProjectSnapshot};
use pacquet_network::ThrottledClient;
use pacquet_npmrc::Npmrc;
use pacquet_package_manifest::DependencyGroup;
use reqwest::Client;
use std::collections::HashMap;

/// This subroutine installs dependencies from a frozen lockfile.
Expand All @@ -19,7 +19,7 @@ pub struct InstallFrozenLockfile<'a, DependencyGroupList>
where
DependencyGroupList: IntoIterator<Item = DependencyGroup>,
{
pub http_client: &'a Client,
pub http_client: &'a ThrottledClient,
pub config: &'static Npmrc,
pub project_snapshot: &'a RootProjectSnapshot,
pub packages: Option<&'a HashMap<DependencyPath, PackageSnapshot>>,
Expand Down
4 changes: 2 additions & 2 deletions crates/package-manager/src/install_package_by_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@ use crate::{CreateVirtualDirBySnapshot, CreateVirtualDirError};
use derive_more::{Display, Error};
use miette::Diagnostic;
use pacquet_lockfile::{DependencyPath, LockfileResolution, PackageSnapshot, PkgNameVerPeer};
use pacquet_network::ThrottledClient;
use pacquet_npmrc::Npmrc;
use pacquet_tarball::{DownloadTarballToStore, TarballError};
use pipe_trait::Pipe;
use reqwest::Client;
use std::borrow::Cow;

/// This subroutine downloads a package tarball, extracts it, installs it to a virtual dir,
/// then creates the symlink layout for the package.
#[must_use]
pub struct InstallPackageBySnapshot<'a> {
pub http_client: &'a Client,
pub http_client: &'a ThrottledClient,
pub config: &'static Npmrc,
pub dependency_path: &'a DependencyPath,
pub package_snapshot: &'a PackageSnapshot,
Expand Down
6 changes: 3 additions & 3 deletions crates/package-manager/src/install_package_from_registry.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use crate::{create_cas_files, symlink_package, CreateCasFilesError, SymlinkPackageError};
use derive_more::{Display, Error};
use miette::Diagnostic;
use pacquet_network::ThrottledClient;
use pacquet_npmrc::Npmrc;
use pacquet_registry::{Package, PackageTag, PackageVersion, RegistryError};
use pacquet_tarball::{DownloadTarballToStore, MemCache, TarballError};
use reqwest::Client;
use std::{path::Path, str::FromStr};

/// This subroutine executes the following and returns the package
Expand All @@ -18,7 +18,7 @@ use std::{path::Path, str::FromStr};
#[must_use]
pub struct InstallPackageFromRegistry<'a> {
pub tarball_mem_cache: &'a MemCache,
pub http_client: &'a Client,
pub http_client: &'a ThrottledClient,
pub config: &'static Npmrc,
pub node_modules_dir: &'a Path,
pub name: &'a str,
Expand Down Expand Up @@ -158,7 +158,7 @@ mod tests {
create_config(store_dir.path(), modules_dir.path(), virtual_store_dir.path())
.pipe(Box::new)
.pipe(Box::leak);
let http_client = reqwest::Client::new();
let http_client = ThrottledClient::new_from_cpu_count();
let package = InstallPackageFromRegistry {
tarball_mem_cache: &Default::default(),
config,
Expand Down
4 changes: 2 additions & 2 deletions crates/package-manager/src/install_without_lockfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ use crate::InstallPackageFromRegistry;
use async_recursion::async_recursion;
use futures_util::future;
use node_semver::Version;
use pacquet_network::ThrottledClient;
use pacquet_npmrc::Npmrc;
use pacquet_package_manifest::{DependencyGroup, PackageManifest};
use pacquet_registry::PackageVersion;
use pacquet_tarball::MemCache;
use pipe_trait::Pipe;
use reqwest::Client;

/// This subroutine install packages from a `package.json` without reading or writing a lockfile.
///
Expand All @@ -21,7 +21,7 @@ use reqwest::Client;
#[must_use]
pub struct InstallWithoutLockfile<'a, DependencyGroupList> {
pub tarball_mem_cache: &'a MemCache,
pub http_client: &'a Client,
pub http_client: &'a ThrottledClient,
pub config: &'static Npmrc,
pub manifest: &'a PackageManifest,
pub dependency_groups: DependencyGroupList,
Expand Down
1 change: 1 addition & 0 deletions crates/registry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ repository.workspace = true

[dependencies]
pacquet-diagnostics = { workspace = true }
pacquet-network = { workspace = true }

derive_more = { workspace = true }
reqwest = { workspace = true }
Expand Down
9 changes: 5 additions & 4 deletions crates/registry/src/package.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{
sync::{Arc, Mutex},
};

use pacquet_network::ThrottledClient;
use pipe_trait::Pipe;
use serde::{Deserialize, Serialize};

Expand All @@ -28,15 +29,15 @@ impl PartialEq for Package {
impl Package {
pub async fn fetch_from_registry(
name: &str,
http_client: &reqwest::Client,
http_client: &ThrottledClient,
registry: &str,
) -> Result<Self, RegistryError> {
let url = || format!("{registry}{name}"); // TODO: use reqwest URL directly
let network_error = |error| NetworkError { error, url: url() };
http_client
.get(url())
.header("content-type", "application/json")
.send()
.run_with_permit(|client| {
client.get(url()).header("content-type", "application/json").send()
})
.await
.map_err(network_error)?
.json::<Package>()
Expand Down
9 changes: 5 additions & 4 deletions crates/registry/src/package_version.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::HashMap;

use pacquet_network::ThrottledClient;
use pipe_trait::Pipe;
use serde::{Deserialize, Serialize};

Expand All @@ -26,16 +27,16 @@ impl PackageVersion {
pub async fn fetch_from_registry(
name: &str,
tag: PackageTag,
http_client: &reqwest::Client,
http_client: &ThrottledClient,
registry: &str,
) -> Result<Self, RegistryError> {
let url = || format!("{registry}{name}/{tag}");
let network_error = |error| NetworkError { error, url: url() };

http_client
.get(url())
.header("content-type", "application/json")
.send()
.run_with_permit(|client| {
client.get(url()).header("content-type", "application/json").send()
})
.await
.map_err(network_error)?
.json::<PackageVersion>()
Expand Down
Loading