Skip to content

Commit 3a5ea84

Browse files
committed
A bunch of annoying plumbing just to be able to test that the config is properly applied.
- downloader handle has the config - downloader handle gets an Arc<Inner>
1 parent c263c2a commit 3a5ea84

File tree

4 files changed

+43
-54
lines changed

4 files changed

+43
-54
lines changed

src/downloader.rs

Lines changed: 28 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -335,12 +335,19 @@ pub struct Config {
335335
}
336336

337337
/// Handle for the download services.
338-
#[derive(Clone, Debug)]
338+
#[derive(Debug, Clone)]
339339
pub struct Downloader {
340+
inner: Arc<Inner>,
341+
}
342+
343+
#[derive(Debug)]
344+
struct Inner {
340345
/// Next id to use for a download intent.
341-
next_id: Arc<AtomicU64>,
346+
next_id: AtomicU64,
342347
/// Channel to communicate with the service.
343348
msg_tx: mpsc::Sender<Message>,
349+
/// Configuration for the downloader.
350+
config: Arc<Config>,
344351
}
345352

346353
impl Downloader {
@@ -349,53 +356,46 @@ impl Downloader {
349356
where
350357
S: Store,
351358
{
352-
Self::with_config(store, endpoint, rt, Default::default(), Default::default())
359+
Self::with_config(store, endpoint, rt, Default::default())
353360
}
354361

355362
/// Create a new Downloader with custom [`ConcurrencyLimits`] and [`RetryConfig`].
356-
pub fn with_config<S>(
357-
store: S,
358-
endpoint: Endpoint,
359-
rt: LocalPoolHandle,
360-
concurrency_limits: ConcurrencyLimits,
361-
retry_config: RetryConfig,
362-
) -> Self
363+
pub fn with_config<S>(store: S, endpoint: Endpoint, rt: LocalPoolHandle, config: Config) -> Self
363364
where
364365
S: Store,
365366
{
367+
let config = Arc::new(config);
366368
let me = endpoint.node_id().fmt_short();
367369
let (msg_tx, msg_rx) = mpsc::channel(SERVICE_CHANNEL_CAPACITY);
368370
let dialer = Dialer::new(endpoint);
369-
371+
let config2 = config.clone();
370372
let create_future = move || {
371373
let getter = get::IoGetter {
372374
store: store.clone(),
373375
};
374-
375-
let service = Service::new(getter, dialer, concurrency_limits, retry_config, msg_rx);
376+
let service = Service::new(getter, dialer, config2, msg_rx);
376377

377378
service.run().instrument(error_span!("downloader", %me))
378379
};
379380
rt.spawn_detached(create_future);
380381
Self {
381-
next_id: Arc::new(AtomicU64::new(0)),
382-
msg_tx,
382+
inner: Arc::new(Inner {
383+
next_id: AtomicU64::new(0),
384+
msg_tx,
385+
config,
386+
}),
383387
}
384388
}
385389

386390
/// Get the current configuration.
387-
pub async fn get_config(&self) -> anyhow::Result<Config> {
388-
let (tx, rx) = oneshot::channel();
389-
let msg = Message::GetConfig { tx };
390-
self.msg_tx.send(msg).await?;
391-
let config = rx.await?;
392-
Ok(config)
391+
pub fn config(&self) -> &Config {
392+
&self.inner.config
393393
}
394394

395395
/// Queue a download.
396396
pub async fn queue(&self, request: DownloadRequest) -> DownloadHandle {
397397
let kind = request.kind;
398-
let intent_id = IntentId(self.next_id.fetch_add(1, Ordering::SeqCst));
398+
let intent_id = IntentId(self.inner.next_id.fetch_add(1, Ordering::SeqCst));
399399
let (sender, receiver) = oneshot::channel();
400400
let handle = DownloadHandle {
401401
id: intent_id,
@@ -409,7 +409,7 @@ impl Downloader {
409409
};
410410
// if this fails polling the handle will fail as well since the sender side of the oneshot
411411
// will be dropped
412-
if let Err(send_err) = self.msg_tx.send(msg).await {
412+
if let Err(send_err) = self.inner.msg_tx.send(msg).await {
413413
let msg = send_err.0;
414414
debug!(?msg, "download not sent");
415415
}
@@ -425,7 +425,7 @@ impl Downloader {
425425
receiver: _,
426426
} = handle;
427427
let msg = Message::CancelIntent { id, kind };
428-
if let Err(send_err) = self.msg_tx.send(msg).await {
428+
if let Err(send_err) = self.inner.msg_tx.send(msg).await {
429429
let msg = send_err.0;
430430
debug!(?msg, "cancel not sent");
431431
}
@@ -437,7 +437,7 @@ impl Downloader {
437437
/// downloads. Use [`Self::queue`] to queue a download.
438438
pub async fn nodes_have(&mut self, hash: Hash, nodes: Vec<NodeId>) {
439439
let msg = Message::NodesHave { hash, nodes };
440-
if let Err(send_err) = self.msg_tx.send(msg).await {
440+
if let Err(send_err) = self.inner.msg_tx.send(msg).await {
441441
let msg = send_err.0;
442442
debug!(?msg, "nodes have not been sent")
443443
}
@@ -459,11 +459,6 @@ enum Message {
459459
/// Cancel an intent. The associated request will be cancelled when the last intent is
460460
/// cancelled.
461461
CancelIntent { id: IntentId, kind: DownloadKind },
462-
/// Get the config
463-
GetConfig {
464-
#[debug(skip)]
465-
tx: oneshot::Sender<Config>,
466-
},
467462
}
468463

469464
#[derive(derive_more::Debug)]
@@ -590,19 +585,13 @@ struct Service<G: Getter, D: DialerT> {
590585
progress_tracker: ProgressTracker,
591586
}
592587
impl<G: Getter<Connection = D::Connection>, D: DialerT> Service<G, D> {
593-
fn new(
594-
getter: G,
595-
dialer: D,
596-
concurrency_limits: ConcurrencyLimits,
597-
retry_config: RetryConfig,
598-
msg_rx: mpsc::Receiver<Message>,
599-
) -> Self {
588+
fn new(getter: G, dialer: D, config: Arc<Config>, msg_rx: mpsc::Receiver<Message>) -> Self {
600589
Service {
601590
getter,
602591
dialer,
603592
msg_rx,
604-
concurrency_limits,
605-
retry_config,
593+
concurrency_limits: config.concurrency,
594+
retry_config: config.retry,
606595
connected_nodes: Default::default(),
607596
retry_node_state: Default::default(),
608597
providers: Default::default(),
@@ -691,13 +680,6 @@ impl<G: Getter<Connection = D::Connection>, D: DialerT> Service<G, D> {
691680
self.queue.unpark_hash(hash);
692681
}
693682
}
694-
Message::GetConfig { tx } => {
695-
let config = Config {
696-
concurrency: self.concurrency_limits,
697-
retry: self.retry_config,
698-
};
699-
tx.send(config).ok();
700-
}
701683
}
702684
}
703685

src/downloader/test.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,16 +47,24 @@ impl Downloader {
4747
let (msg_tx, msg_rx) = mpsc::channel(super::SERVICE_CHANNEL_CAPACITY);
4848

4949
let lp = LocalPool::default();
50+
let config = Arc::new(Config {
51+
concurrency: concurrency_limits,
52+
retry: retry_config,
53+
});
54+
let config2 = config.clone();
5055
lp.spawn_detached(move || async move {
5156
// we want to see the logs of the service
52-
let service = Service::new(getter, dialer, concurrency_limits, retry_config, msg_rx);
57+
let service = Service::new(getter, dialer, config2, msg_rx);
5358
service.run().await
5459
});
5560

5661
(
5762
Downloader {
58-
next_id: Arc::new(AtomicU64::new(0)),
59-
msg_tx,
63+
inner: Arc::new(Inner {
64+
next_id: AtomicU64::new(0),
65+
msg_tx,
66+
config,
67+
}),
6068
},
6169
lp,
6270
)

src/net_protocol.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -177,13 +177,12 @@ impl<S: crate::store::Store> Builder<S> {
177177
.rt
178178
.map(Rt::Handle)
179179
.unwrap_or_else(|| Rt::Owned(LocalPool::default()));
180-
let downloader::Config { concurrency, retry } = self.downloader.unwrap_or_default();
180+
let downloader_config = self.downloader.unwrap_or_default();
181181
let downloader = Downloader::with_config(
182182
self.store.clone(),
183183
endpoint.clone(),
184184
rt.clone(),
185-
concurrency,
186-
retry,
185+
downloader_config,
187186
);
188187
Blobs::new(
189188
self.store,

tests/rpc.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ async fn downloader_config() -> TestResult<()> {
104104
},
105105
};
106106
let blobs = Blobs::builder(store).downloader(expected).build(&endpoint);
107-
let actual = blobs.downloader().get_config().await?;
108-
assert_eq!(expected, actual);
107+
let actual = blobs.downloader().config();
108+
assert_eq!(&expected, actual);
109109
Ok(())
110110
}

0 commit comments

Comments
 (0)