Skip to content

Commit f76fecf

Browse files
feat: dynamic routing mvp (#619)
1 parent a53948e commit f76fecf

15 files changed

+422
-176
lines changed

ic-agent/Cargo.toml

+7-12
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@ keywords = ["internet-computer", "agent", "icp", "dfinity"]
1515
include = ["src", "Cargo.toml", "../LICENSE", "README.md"]
1616

1717
[dependencies]
18-
arc-swap = { version = "1.7", optional = true }
19-
async-channel = { version = "1.9", optional = true }
18+
arc-swap = "1.7"
19+
async-channel = "1.9"
2020
async-lock = "3.3"
2121
async-trait = "0.1"
22-
async-watch = { version = "0.3", optional = true }
22+
async-watch = "0.3"
2323
backoff = "0.4.0"
2424
cached = { version = "0.52", features = ["ahash"], default-features = false }
2525
candid = { workspace = true }
@@ -48,7 +48,7 @@ serde_cbor = { workspace = true }
4848
serde_repr = { workspace = true }
4949
sha2 = { workspace = true }
5050
simple_asn1 = "0.6.1"
51-
stop-token = { version = "0.7", optional = true }
51+
stop-token = "0.7"
5252
thiserror = { workspace = true }
5353
time = { workspace = true }
5454
tower-service = "0.3"
@@ -77,6 +77,7 @@ web-sys = { version = "0.3", features = ["Window"], optional = true }
7777
[dev-dependencies]
7878
serde_json.workspace = true
7979
tracing-subscriber = "0.3"
80+
tracing = "0.1"
8081

8182
[target.'cfg(not(target_family = "wasm"))'.dev-dependencies]
8283
tokio = { workspace = true, features = ["full"] }
@@ -109,16 +110,10 @@ wasm-bindgen = [
109110
"backoff/wasm-bindgen",
110111
"cached/wasm",
111112
]
112-
_internal_dynamic-routing = [
113-
"dep:arc-swap",
114-
"dep:async-channel",
115-
"dep:async-watch",
116-
"dep:stop-token",
117-
"tracing",
118-
]
113+
_internal_dynamic-routing = []
119114
tracing = ["dep:tracing"] # Does very little right now.
120115

121116
[package.metadata.docs.rs]
122117
targets = ["x86_64-unknown-linux-gnu", "wasm32-unknown-unknown"]
123118
rustdoc-args = ["--cfg=docsrs"]
124-
features = ["_internal_dynamic-routing"]
119+
features = []

ic-agent/src/agent/agent_config.rs

+7
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use reqwest::Client;
2+
use url::Url;
23

34
use crate::{
45
agent::{NonceFactory, NonceGenerator},
@@ -33,6 +34,10 @@ pub struct AgentConfig {
3334
pub http_service: Option<Arc<dyn HttpService>>,
3435
/// See [`with_max_polling_time`](super::AgentBuilder::with_max_polling_time).
3536
pub max_polling_time: Duration,
37+
/// See [`with_background_dynamic_routing`](super::AgentBuilder::with_background_dynamic_routing).
38+
pub background_dynamic_routing: bool,
39+
/// See [`with_url`](super::AgentBuilder::with_url).
40+
pub url: Option<Url>,
3641
}
3742

3843
impl Default for AgentConfig {
@@ -49,6 +54,8 @@ impl Default for AgentConfig {
4954
max_response_body_size: None,
5055
max_tcp_error_retries: 0,
5156
max_polling_time: Duration::from_secs(60 * 5),
57+
background_dynamic_routing: false,
58+
url: None,
5259
}
5360
}
5461
}

ic-agent/src/agent/builder.rs

+27-28
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
use url::Url;
2-
31
use crate::{
42
agent::{agent_config::AgentConfig, Agent},
53
AgentError, Identity, NonceFactory, NonceGenerator,
@@ -20,34 +18,27 @@ impl AgentBuilder {
2018
Agent::new(self.config)
2119
}
2220

23-
/// Set the dynamic transport layer for the [`Agent`], performing continuous discovery of the API boundary nodes and routing traffic via them based on latency.
24-
#[cfg(feature = "_internal_dynamic-routing")]
25-
pub async fn with_discovery_transport(self, client: reqwest::Client) -> Self {
26-
use crate::agent::route_provider::dynamic_routing::{
27-
dynamic_route_provider::{DynamicRouteProviderBuilder, IC0_SEED_DOMAIN},
28-
node::Node,
29-
snapshot::latency_based_routing::LatencyRoutingSnapshot,
30-
};
31-
// TODO: This is a temporary solution to get the seed node.
32-
let seed = Node::new(IC0_SEED_DOMAIN).unwrap();
33-
34-
let route_provider = DynamicRouteProviderBuilder::new(
35-
LatencyRoutingSnapshot::new(),
36-
vec![seed],
37-
client.clone(),
38-
)
39-
.build()
40-
.await;
41-
42-
let route_provider = Arc::new(route_provider) as Arc<dyn RouteProvider>;
43-
44-
self.with_arc_route_provider(route_provider)
45-
.with_http_client(client)
21+
/// Set the dynamic transport layer for the [`Agent`], performing continuous discovery of the API boundary nodes
22+
/// and routing traffic via them based on latency. Cannot be set together with `with_route_provider`.
23+
///
24+
/// See [`DynamicRouteProvider`](super::route_provider::DynamicRouteProvider) if more customization is needed such as polling intervals.
25+
pub async fn with_background_dynamic_routing(mut self) -> Self {
26+
assert!(
27+
self.config.route_provider.is_none(),
28+
"with_background_dynamic_routing cannot be called with with_route_provider"
29+
);
30+
self.config.background_dynamic_routing = true;
31+
self
4632
}
4733

48-
/// Set the URL of the [Agent].
49-
pub fn with_url<S: Into<String>>(self, url: S) -> Self {
50-
self.with_route_provider(url.into().parse::<Url>().unwrap())
34+
/// Set the URL of the [`Agent`]. Either this or `with_route_provider` must be called (but not both).
35+
pub fn with_url<S: Into<String>>(mut self, url: S) -> Self {
36+
assert!(
37+
self.config.route_provider.is_none(),
38+
"with_url cannot be called with with_route_provider"
39+
);
40+
self.config.url = Some(url.into().parse().unwrap());
41+
self
5142
}
5243

5344
/// Add a `NonceFactory` to this Agent. By default, no nonce is produced.
@@ -125,6 +116,14 @@ impl AgentBuilder {
125116

126117
/// Same as [`Self::with_route_provider`], but reuses an existing `Arc`.
127118
pub fn with_arc_route_provider(mut self, provider: Arc<dyn RouteProvider>) -> Self {
119+
assert!(
120+
!self.config.background_dynamic_routing,
121+
"with_background_dynamic_routing cannot be called with with_route_provider"
122+
);
123+
assert!(
124+
self.config.url.is_none(),
125+
"with_url cannot be called with with_route_provider"
126+
);
128127
self.config.route_provider = Some(provider);
129128
self
130129
}

ic-agent/src/agent/mod.rs

+55-26
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,13 @@ pub use ic_transport_types::{
2929
pub use nonce::{NonceFactory, NonceGenerator};
3030
use rangemap::{RangeInclusiveMap, RangeInclusiveSet, StepFns};
3131
use reqwest::{Body, Client, Request, Response};
32-
use route_provider::RouteProvider;
32+
use route_provider::{
33+
dynamic_routing::{
34+
dynamic_route_provider::DynamicRouteProviderBuilder, node::Node,
35+
snapshot::latency_based_routing::LatencyRoutingSnapshot,
36+
},
37+
RouteProvider, UrlUntilReady,
38+
};
3339
use time::OffsetDateTime;
3440
use tower_service::Service;
3541

@@ -58,7 +64,7 @@ use std::{
5864
borrow::Cow,
5965
collections::HashMap,
6066
convert::TryFrom,
61-
fmt,
67+
fmt::{self, Debug},
6268
future::{Future, IntoFuture},
6369
pin::Pin,
6470
sync::{Arc, Mutex, RwLock},
@@ -177,32 +183,54 @@ impl Agent {
177183

178184
/// Create an instance of an [`Agent`].
179185
pub fn new(config: agent_config::AgentConfig) -> Result<Agent, AgentError> {
186+
let client = config.http_service.unwrap_or_else(|| {
187+
Arc::new(Retry429Logic {
188+
client: config.client.unwrap_or_else(|| {
189+
#[cfg(not(target_family = "wasm"))]
190+
{
191+
Client::builder()
192+
.use_rustls_tls()
193+
.timeout(Duration::from_secs(360))
194+
.build()
195+
.expect("Could not create HTTP client.")
196+
}
197+
#[cfg(all(target_family = "wasm", feature = "wasm-bindgen"))]
198+
{
199+
Client::new()
200+
}
201+
}),
202+
})
203+
});
180204
Ok(Agent {
181205
nonce_factory: config.nonce_factory,
182206
identity: config.identity,
183207
ingress_expiry: config.ingress_expiry,
184208
root_key: Arc::new(RwLock::new(IC_ROOT_KEY.to_vec())),
185-
client: config.http_service.unwrap_or_else(|| {
186-
Arc::new(Retry429Logic {
187-
client: config.client.unwrap_or_else(|| {
188-
#[cfg(not(target_family = "wasm"))]
189-
{
190-
Client::builder()
191-
.use_rustls_tls()
192-
.timeout(Duration::from_secs(360))
193-
.build()
194-
.expect("Could not create HTTP client.")
195-
}
196-
#[cfg(all(target_family = "wasm", feature = "wasm-bindgen"))]
197-
{
198-
Client::new()
199-
}
200-
}),
201-
})
202-
}),
203-
route_provider: config
204-
.route_provider
205-
.expect("missing `url` or `route_provider` in `AgentBuilder`"),
209+
client: client.clone(),
210+
route_provider: if let Some(route_provider) = config.route_provider {
211+
route_provider
212+
} else if let Some(url) = config.url {
213+
if config.background_dynamic_routing {
214+
assert!(
215+
url.scheme() == "https" && url.path() == "/" && url.port().is_none() && url.domain().is_some(),
216+
"in dynamic routing mode, URL must be in the exact form https://domain with no path, port, IP, or non-HTTPS scheme"
217+
);
218+
let seeds = vec![Node::new(url.domain().unwrap()).unwrap()];
219+
UrlUntilReady::new(url, async move {
220+
DynamicRouteProviderBuilder::new(
221+
LatencyRoutingSnapshot::new(),
222+
seeds,
223+
client,
224+
)
225+
.build()
226+
.await
227+
}) as Arc<dyn RouteProvider>
228+
} else {
229+
Arc::new(url)
230+
}
231+
} else {
232+
panic!("either route_provider or url must be specified");
233+
},
206234
subnet_key_cache: Arc::new(Mutex::new(SubnetCache::new())),
207235
verify_query_signatures: config.verify_query_signatures,
208236
concurrent_requests_semaphore: Arc::new(Semaphore::new(config.max_concurrent_requests)),
@@ -1862,7 +1890,7 @@ impl<'agent> IntoFuture for UpdateBuilder<'agent> {
18621890
/// HTTP client middleware. Implemented automatically for `reqwest`-compatible by-ref `tower::Service`, such as `reqwest_middleware`.
18631891
#[cfg_attr(target_family = "wasm", async_trait(?Send))]
18641892
#[cfg_attr(not(target_family = "wasm"), async_trait)]
1865-
pub trait HttpService: Send + Sync {
1893+
pub trait HttpService: Send + Sync + Debug {
18661894
/// Perform a HTTP request. Any retry logic should call `req` again, instead of `Request::try_clone`.
18671895
async fn call<'a>(
18681896
&'a self,
@@ -1876,7 +1904,7 @@ impl<T> HttpService for T
18761904
where
18771905
for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
18781906
for<'a> <&'a Self as Service<Request>>::Future: Send,
1879-
T: Send + Sync + ?Sized,
1907+
T: Send + Sync + Debug + ?Sized,
18801908
{
18811909
#[allow(clippy::needless_arbitrary_self_type)]
18821910
async fn call<'a>(
@@ -1907,7 +1935,7 @@ where
19071935
impl<T> HttpService for T
19081936
where
19091937
for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
1910-
T: Send + Sync + ?Sized,
1938+
T: Send + Sync + Debug + ?Sized,
19111939
{
19121940
#[allow(clippy::needless_arbitrary_self_type)]
19131941
async fn call<'a>(
@@ -1919,6 +1947,7 @@ where
19191947
}
19201948
}
19211949

1950+
#[derive(Debug)]
19221951
struct Retry429Logic {
19231952
client: Client,
19241953
}

0 commit comments

Comments
 (0)