Skip to content

Commit c263c2a

Browse files
committed
Allow getting the downloader config so we can test it
1 parent 5c4816b commit c263c2a

File tree

3 files changed

+59
-12
lines changed

3 files changed

+59
-12
lines changed

src/downloader.rs

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ pub enum GetOutput<N> {
141141
}
142142

143143
/// Concurrency limits for the [`Downloader`].
144-
#[derive(Debug)]
144+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
145145
pub struct ConcurrencyLimits {
146146
/// Maximum number of requests the service performs concurrently.
147147
pub max_concurrent_requests: usize,
@@ -193,7 +193,7 @@ impl ConcurrencyLimits {
193193
}
194194

195195
/// Configuration for retry behavior of the [`Downloader`].
196-
#[derive(Debug)]
196+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
197197
pub struct RetryConfig {
198198
/// Maximum number of retry attempts for a node that failed to dial or failed with IO errors.
199199
pub max_retries_per_node: u32,
@@ -325,6 +325,15 @@ impl Future for DownloadHandle {
325325
}
326326
}
327327

328+
/// All numerical config options for the downloader.
329+
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
330+
pub struct Config {
331+
/// Concurrency limits for the downloader.
332+
pub concurrency: ConcurrencyLimits,
333+
/// Retry configuration for the downloader.
334+
pub retry: RetryConfig,
335+
}
336+
328337
/// Handle for the download services.
329338
#[derive(Clone, Debug)]
330339
pub struct Downloader {
@@ -374,6 +383,15 @@ impl Downloader {
374383
}
375384
}
376385

386+
/// Get the current configuration.
387+
pub async fn get_config(&self) -> anyhow::Result<Config> {
388+
let (tx, rx) = oneshot::channel();
389+
let msg = Message::GetConfig { tx };
390+
self.msg_tx.send(msg).await?;
391+
let config = rx.await?;
392+
Ok(config)
393+
}
394+
377395
/// Queue a download.
378396
pub async fn queue(&self, request: DownloadRequest) -> DownloadHandle {
379397
let kind = request.kind;
@@ -441,6 +459,11 @@ enum Message {
441459
/// Cancel an intent. The associated request will be cancelled when the last intent is
442460
/// cancelled.
443461
CancelIntent { id: IntentId, kind: DownloadKind },
462+
/// Get the config
463+
GetConfig {
464+
#[debug(skip)]
465+
tx: oneshot::Sender<Config>,
466+
},
444467
}
445468

446469
#[derive(derive_more::Debug)]
@@ -668,6 +691,13 @@ impl<G: Getter<Connection = D::Connection>, D: DialerT> Service<G, D> {
668691
self.queue.unpark_hash(hash);
669692
}
670693
}
694+
Message::GetConfig { tx } => {
695+
let config = Config {
696+
concurrency: self.concurrency_limits,
697+
retry: self.retry_config,
698+
};
699+
tx.send(config).ok();
700+
}
671701
}
672702
}
673703

src/net_protocol.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -142,18 +142,12 @@ impl BlobBatches {
142142
}
143143
}
144144

145-
#[derive(Debug, Default)]
146-
pub struct DownloaderConfig {
147-
pub concurrency: downloader::ConcurrencyLimits,
148-
pub retry: downloader::RetryConfig,
149-
}
150-
151145
/// Builder for the Blobs protocol handler
152146
#[derive(Debug)]
153147
pub struct Builder<S> {
154148
store: S,
155149
events: Option<EventSender>,
156-
downloader: Option<DownloaderConfig>,
150+
downloader: Option<crate::downloader::Config>,
157151
rt: Option<LocalPoolHandle>,
158152
}
159153

@@ -171,7 +165,7 @@ impl<S: crate::store::Store> Builder<S> {
171165
}
172166

173167
/// Set a custom downloader configuration.
174-
pub fn downloader(mut self, config: DownloaderConfig) -> Self {
168+
pub fn downloader(mut self, config: downloader::Config) -> Self {
175169
self.downloader = Some(config);
176170
self
177171
}
@@ -183,7 +177,7 @@ impl<S: crate::store::Store> Builder<S> {
183177
.rt
184178
.map(Rt::Handle)
185179
.unwrap_or_else(|| Rt::Owned(LocalPool::default()));
186-
let DownloaderConfig { concurrency, retry } = self.downloader.unwrap_or_default();
180+
let downloader::Config { concurrency, retry } = self.downloader.unwrap_or_default();
187181
let downloader = Downloader::with_config(
188182
self.store.clone(),
189183
endpoint.clone(),

tests/rpc.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#![cfg(feature = "test")]
22
use std::{net::SocketAddr, path::PathBuf, vec};
33

4-
use iroh_blobs::net_protocol::Blobs;
4+
use iroh_blobs::{downloader, net_protocol::Blobs};
55
use quic_rpc::client::QuinnConnector;
66
use tempfile::TempDir;
77
use testresult::TestResult;
@@ -85,3 +85,26 @@ async fn quinn_rpc_large() -> TestResult<()> {
8585
assert_eq!(data, &data2[..]);
8686
Ok(())
8787
}
88+
89+
#[tokio::test]
90+
async fn downloader_config() -> TestResult<()> {
91+
let _ = tracing_subscriber::fmt::try_init();
92+
let endpoint = iroh::Endpoint::builder().bind().await?;
93+
let store = iroh_blobs::store::mem::Store::default();
94+
let expected = downloader::Config {
95+
concurrency: downloader::ConcurrencyLimits {
96+
max_concurrent_requests: usize::MAX,
97+
max_concurrent_requests_per_node: usize::MAX,
98+
max_open_connections: usize::MAX,
99+
max_concurrent_dials_per_hash: usize::MAX,
100+
},
101+
retry: downloader::RetryConfig {
102+
max_retries_per_node: u32::MAX,
103+
initial_retry_delay: std::time::Duration::from_secs(1),
104+
},
105+
};
106+
let blobs = Blobs::builder(store).downloader(expected).build(&endpoint);
107+
let actual = blobs.downloader().get_config().await?;
108+
assert_eq!(expected, actual);
109+
Ok(())
110+
}

0 commit comments

Comments
 (0)