Skip to content

feat: client-side ratelimiting and auto-retrying 429s #549

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

* Added a limit to the concurrent requests an agent will make at once. This should make server-side ratelimiting much rarer to encounter, even when sending a high volume of requests (for example, a large `ic_utils::ManagementCanister::install` call).
* The agent will now automatically retry 429 Too Many Requests responses after a short delay.
* BREAKING: Changed Chunk Store API to conform to the interface specification:
* `ChunkHash` was changed from `[u8; 32]` to a struct.
* Return types of `ManagementCanister::stored_chunks()` and `ManagementCanister::upload_chunk()`.
Expand Down
54 changes: 54 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 14 additions & 2 deletions ic-agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ keywords = ["internet-computer", "agent", "icp", "dfinity"]
include = ["src", "Cargo.toml", "../LICENSE", "README.md"]

[dependencies]
async-lock = "3.3"
backoff = "0.4.0"
cached = { version = "0.46", features = ["ahash"], default-features = false }
candid = { workspace = true }
Expand Down Expand Up @@ -62,7 +63,11 @@ optional = true
[target.'cfg(not(target_family = "wasm"))'.dependencies]
http-body-to-bytes = { version = "0.2.0", optional = true }
http-body-util = { version = "0.1.0", optional = true }
hyper-util = { version = "0.1.3", features = ["client", "client-legacy", "http2"], optional = true }
hyper-util = { version = "0.1.3", features = [
"client",
"client-legacy",
"http2",
], optional = true }
hyper-rustls = { version = "0.26.0", features = [
"webpki-roots",
"http2",
Expand Down Expand Up @@ -98,7 +103,14 @@ web-sys = { version = "0.3", features = [
[features]
default = ["pem", "reqwest"]
reqwest = ["dep:reqwest"]
hyper = ["dep:hyper", "dep:hyper-rustls", "dep:http-body-to-bytes", "dep:http-body-util", "dep:hyper-util", "dep:tower"]
hyper = [
"dep:hyper",
"dep:hyper-rustls",
"dep:http-body-to-bytes",
"dep:http-body-util",
"dep:hyper-util",
"dep:tower",
]
ic_ref_tests = [
"default",
] # Used to separate integration tests for ic-ref which need a server running.
Expand Down
3 changes: 3 additions & 0 deletions ic-agent/src/agent/agent_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ pub struct AgentConfig {
pub transport: Option<Arc<dyn Transport>>,
/// See [`verify_query_signatures`](super::AgentBuilder::with_verify_query_signatures).
pub verify_query_signatures: bool,
/// See [`with_max_concurrent_requests`](super::AgentBuilder::with_max_concurrent_requests).
pub max_concurrent_requests: usize,
}

impl Default for AgentConfig {
Expand All @@ -26,6 +28,7 @@ impl Default for AgentConfig {
ingress_expiry: None,
transport: None,
verify_query_signatures: true,
max_concurrent_requests: 50,
}
}
}
69 changes: 55 additions & 14 deletions ic-agent/src/agent/agent_test.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
// Disable these tests without the reqwest feature.
#![cfg(feature = "reqwest")]

use self::mock::{assert_mock, assert_single_mock, mock, mock_additional};
use self::mock::{
assert_mock, assert_single_mock, assert_single_mock_count, mock, mock_additional,
};
use crate::{
agent::{http_transport::ReqwestTransport, Status},
export::Principal,
Agent, AgentError, Certificate,
};
use candid::{Encode, Nat};
use futures_util::FutureExt;
use ic_certification::{Delegation, Label};
use ic_transport_types::{NodeSignature, QueryResponse, RejectCode, RejectResponse, ReplyResponse};
use std::{collections::BTreeMap, time::Duration};
Expand Down Expand Up @@ -541,6 +544,31 @@ async fn too_many_delegations() {
));
}

#[cfg_attr(not(target_family = "wasm"), tokio::test)]
#[cfg_attr(target_family = "wasm", wasm_bindgen_test)]
async fn retry_ratelimit() {
let (mut mock, url) = mock(
"POST",
"/api/v2/canister/ryjl3-tyaaa-aaaaa-aaaba-cai/query",
429,
vec![],
Some("text/plain"),
)
.await;
let agent = make_agent(&url);
futures_util::select! {
_ = agent.query(&"ryjl3-tyaaa-aaaaa-aaaba-cai".parse().unwrap(), "greet").call().fuse() => panic!("did not retry 429"),
_ = crate::util::sleep(Duration::from_millis(500)).fuse() => {},
};
assert_single_mock_count(
"POST",
"/api/v2/canister/ryjl3-tyaaa-aaaaa-aaaba-cai/query",
2,
&mut mock,
)
.await;
}

#[cfg(not(target_family = "wasm"))]
mod mock {

Expand Down Expand Up @@ -604,6 +632,19 @@ mod mock {
) {
mocks[&format!("{method} {path}")].assert_async().await;
}

pub async fn assert_single_mock_count(
method: &str,
path: &str,
n: usize,
(_, mocks): &mut (ServerGuard, HashMap<String, Mock>),
) {
let k = format!("{method} {path}");
let mut mock = mocks.remove(&k).unwrap();
mock = mock.expect_at_least(n);
mock.assert_async().await;
mocks.insert(k, mock);
}
}

#[cfg(all(target_family = "wasm", feature = "wasm-bindgen"))]
Expand Down Expand Up @@ -697,8 +738,8 @@ mod mock {
.unwrap();
}

pub async fn assert_mock(nonce: String) {
let hits: HashMap<String, i64> = Client::new()
async fn get_hits(nonce: &str) -> HashMap<String, i64> {
Client::new()
.get(&format!("http://mock_assert/{}", nonce))
.send()
.await
Expand All @@ -707,21 +748,21 @@ mod mock {
.unwrap()
.json()
.await
.unwrap();
.unwrap()
}

pub async fn assert_mock(nonce: String) {
let hits = get_hits(&nonce).await;
assert!(hits.values().all(|x| *x > 0));
}

pub async fn assert_single_mock(method: &str, path: &str, nonce: &String) {
let hits: HashMap<String, i64> = Client::new()
.get(&format!("http://mock_assert/{}", nonce))
.send()
.await
.unwrap()
.error_for_status()
.unwrap()
.json()
.await
.unwrap();
let hits = get_hits(nonce).await;
assert!(hits[&format!("{method} {path}")] > 0);
}

pub async fn assert_single_mock_count(method: &str, path: &str, n: usize, nonce: &mut String) {
let hits = get_hits(&*nonce).await;
assert!(hits[&format!("{method} {path}")] >= n as i64);
}
}
8 changes: 8 additions & 0 deletions ic-agent/src/agent/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,12 @@ impl AgentBuilder {
self.config.verify_query_signatures = verify_query_signatures;
self
}

/// Sets the maximum number of requests that the agent will make concurrently. The replica is configured
/// to only permit 50 concurrent requests per client. Set this value lower if you have multiple agents,
/// to avoid the slowdown of retrying any 429 errors.
pub fn with_max_concurrent_requests(mut self, max_concurrent_requests: usize) -> Self {
self.config.max_concurrent_requests = max_concurrent_requests;
self
}
}
35 changes: 21 additions & 14 deletions ic-agent/src/agent/http_transport/hyper_transport.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
//! A [`Transport`] that connects using a [`hyper`] client.
use http::StatusCode;
pub use hyper;

use std::sync::Arc;
use std::time::Duration;
use std::{any, error::Error, future::Future, marker::PhantomData, sync::atomic::AtomicPtr};

use http_body::Body;
Expand Down Expand Up @@ -140,13 +142,7 @@ where
url: String,
body: Option<Vec<u8>>,
) -> Result<Vec<u8>, AgentError> {
let http_request = Request::builder()
.method(method)
.uri(url)
.header(CONTENT_TYPE, "application/cbor")
.body(body.unwrap_or_default().into())
.map_err(|err| AgentError::TransportError(Box::new(err)))?;

let body = body.unwrap_or_default();
fn map_error<E: Error + Send + Sync + 'static>(err: E) -> AgentError {
if any::TypeId::of::<E>() == any::TypeId::of::<AgentError>() {
// Store the value in an `Option` so we can `take`
Expand All @@ -165,13 +161,24 @@ where
}
AgentError::TransportError(Box::new(err))
}
let response = self
.service
.clone()
.call(http_request)
.await
.map_err(map_error)?;

let response = loop {
let http_request = Request::builder()
.method(&method)
.uri(&url)
.header(CONTENT_TYPE, "application/cbor")
.body(body.clone().into())
.map_err(|err| AgentError::TransportError(Box::new(err)))?;
let response = self
.service
.clone()
.call(http_request)
.await
.map_err(map_error)?;
if response.status() != StatusCode::TOO_MANY_REQUESTS {
break response;
}
crate::util::sleep(Duration::from_millis(250)).await;
};
let (parts, body) = response.into_parts();
let body = if let Some(limit) = self.max_response_body_size {
http_body_to_bytes_with_max_length(body, limit)
Expand Down
10 changes: 8 additions & 2 deletions ic-agent/src/agent/http_transport/reqwest_transport.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! A [`Transport`] that connects using a [`reqwest`] client.
#![cfg(feature = "reqwest")]

use std::sync::Arc;
use std::{sync::Arc, time::Duration};

use ic_transport_types::RejectResponse;
pub use reqwest;
Expand Down Expand Up @@ -136,7 +136,13 @@ impl ReqwestTransport {

*http_request.body_mut() = body.map(Body::from);

let request_result = self.request(http_request.try_clone().unwrap()).await?;
let request_result = loop {
let result = self.request(http_request.try_clone().unwrap()).await?;
if result.0 != StatusCode::TOO_MANY_REQUESTS {
break result;
}
crate::util::sleep(Duration::from_millis(250)).await;
};
let status = request_result.0;
let headers = request_result.1;
let body = request_result.2;
Expand Down
Loading