@@ -474,10 +474,10 @@ async fn transfer_from_provider(context: &Context, ticket: &Ticket) -> Result<()
474
474
let progress = ProgressEmitter :: new ( 0 , ReceiveProgress :: max_blob_progress ( ) ) ;
475
475
spawn_progress_proxy ( context. clone ( ) , progress. subscribe ( ) ) ;
476
476
477
- let jobs = Mutex :: new ( JoinSet :: default ( ) ) ;
477
+ let jobs = Arc :: new ( Mutex :: new ( JoinSet :: default ( ) ) ) ;
478
478
479
479
// Perform the transfer.
480
- let stats = run_get_request ( context, & progress, & jobs, ticket. clone ( ) ) . await ?;
480
+ let stats = run_get_request ( context, & progress, jobs. clone ( ) , ticket. clone ( ) ) . await ?;
481
481
482
482
let mut jobs = jobs. lock ( ) . await ;
483
483
while let Some ( job) = jobs. join_next ( ) . await {
@@ -496,7 +496,7 @@ async fn transfer_from_provider(context: &Context, ticket: &Ticket) -> Result<()
496
496
async fn run_get_request (
497
497
context : & Context ,
498
498
progress : & ProgressEmitter ,
499
- jobs : & Mutex < JoinSet < ( ) > > ,
499
+ jobs : Arc < Mutex < JoinSet < ( ) > > > ,
500
500
ticket : Ticket ,
501
501
) -> anyhow:: Result < Stats > {
502
502
// DERP usage for NAT traversal and relay are currently disabled.
@@ -511,6 +511,8 @@ async fn run_get_request(
511
511
let connected = initial. next ( ) . await ?;
512
512
context. emit_event ( ReceiveProgress :: Connected . into ( ) ) ;
513
513
514
+ let rt = runtime:: Handle :: from_currrent ( 1 ) ?;
515
+
514
516
// we assume that the request includes the entire collection
515
517
let ( mut next, _root, collection) = {
516
518
let fsm:: ConnectedNext :: StartRoot ( sc) = connected. next ( ) . await ? else {
@@ -541,7 +543,18 @@ async fn run_get_request(
541
543
} ;
542
544
543
545
let start = start. next ( blob. hash ) ;
544
- let done = on_blob ( context, jobs, & ticket, start, & blob. name ) . await ?;
546
+
547
+ // `iroh_io` io needs to be done on a local spawn
548
+ let ticket = ticket. clone ( ) ;
549
+ let context = context. clone ( ) ;
550
+ let name = blob. name . clone ( ) ;
551
+ let jobs = jobs. clone ( ) ;
552
+ let done = rt
553
+ . local_pool ( )
554
+ . spawn_pinned ( move || {
555
+ Box :: pin ( async move { on_blob ( context, jobs, ticket, start, & name) . await } )
556
+ } )
557
+ . await ??;
545
558
546
559
next = done. next ( ) ;
547
560
} ;
@@ -555,9 +568,9 @@ async fn run_get_request(
555
568
/// This writes the blobs to the blobdir. If the blob is the database it will import it to
556
569
/// the database of the current [`Context`].
557
570
async fn on_blob (
558
- context : & Context ,
559
- jobs : & Mutex < JoinSet < ( ) > > ,
560
- ticket : & Ticket ,
571
+ context : Context ,
572
+ jobs : Arc < Mutex < JoinSet < ( ) > > > ,
573
+ ticket : Ticket ,
561
574
state : fsm:: AtBlobHeader ,
562
575
name : & str ,
563
576
) -> Result < fsm:: AtEndBlob > {
0 commit comments