@@ -305,45 +305,44 @@ fn id_of_output(output: &Output) -> Result<RequestId> {
305305
306306#[ cfg( test) ]
307307mod tests {
308+ use std:: { future:: Future , pin:: Pin } ;
309+
308310 use super :: * ;
309311 use crate :: Error :: Rpc ;
310- use core:: pin:: Pin ;
311- use futures:: { lock:: Mutex , Future } ;
312- use hyper:: {
313- body:: HttpBody ,
314- service:: { make_service_fn, service_fn} ,
315- Body , Error , Method , Request , Response , Server ,
316- } ;
312+ use futures:: lock:: Mutex ;
313+ use http_body_util:: { BodyExt , Full } ;
314+ use hyper:: { body:: Bytes , body:: Incoming , server:: conn:: http1, service:: service_fn, Method , Request , Response } ;
315+ use hyper_util:: rt:: TokioIo ;
317316 use jsonrpc_core:: ErrorCode ;
318- use std:: net:: TcpListener ;
319- use tokio:: { task:: JoinHandle , time:: Instant } ;
317+ use tokio:: { net:: TcpListener , task:: JoinHandle , time:: Instant } ;
320318
321- type HyperResponse = Pin < Box < dyn Future < Output = hyper:: Result < Response < Body > > > + Send > > ;
319+ type HyperResponse =
320+ Pin < Box < dyn Future < Output = std:: result:: Result < Response < Full < Bytes > > , hyper:: http:: Error > > + Send + Sync > > ;
322321
323- type HyperHandler = Box < dyn Fn ( Request < Body > ) -> HyperResponse + Send + Sync > ;
322+ type HyperHandler = Box < dyn Fn ( Request < Incoming > ) -> HyperResponse + Send + Sync > ;
324323
325- fn get_available_port ( ) -> Option < u16 > {
326- Some ( TcpListener :: bind ( ( "127.0.0.1" , 0 ) ) . ok ( ) ?. local_addr ( ) . ok ( ) ?. port ( ) )
324+ async fn get_available_port ( ) -> Option < u16 > {
325+ Some (
326+ TcpListener :: bind ( ( "127.0.0.1" , 0 ) )
327+ . await
328+ . ok ( ) ?
329+ . local_addr ( )
330+ . ok ( ) ?
331+ . port ( ) ,
332+ )
327333 }
328334
329- fn create_server ( port : u16 , handler : HyperHandler ) -> JoinHandle < ( ) > {
335+ async fn create_server ( port : u16 , handler : HyperHandler ) -> JoinHandle < ( ) > {
330336 let addr = format ! ( "127.0.0.1:{}" , port) ;
337+ let listener = TcpListener :: bind ( addr) . await . unwrap ( ) ;
331338 let handler = Arc :: new ( handler) ;
332- let service = make_service_fn ( move |_| {
333- let handler = handler. clone ( ) ;
334- async move {
335- let handler = handler. clone ( ) ;
336- Ok :: < _ , Error > ( service_fn ( move |req| {
337- let handler = handler. clone ( ) ;
338- async move { handler ( req) . await }
339- } ) )
339+ tokio:: task:: spawn ( async move {
340+ loop {
341+ let ( stream, _) = listener. accept ( ) . await . unwrap ( ) ;
342+ let service = service_fn ( handler. as_ref ( ) ) ;
343+ let io = TokioIo :: new ( stream) ;
344+ http1:: Builder :: new ( ) . serve_connection ( io, service) . await . unwrap ( ) ;
340345 }
341- } ) ;
342-
343- let server = Server :: bind ( & addr. parse ( ) . unwrap ( ) ) . serve ( service) ;
344- tokio:: spawn ( async move {
345- println ! ( "Listening on http://{}" , addr) ;
346- server. await . unwrap ( ) ;
347346 } )
348347 }
349348
@@ -356,9 +355,9 @@ mod tests {
356355 }
357356
358357 fn return_429 ( retry_after_value : Option < String > ) -> HyperHandler {
359- Box :: new ( move |_req : Request < Body > | -> HyperResponse {
358+ Box :: new ( move |_req : Request < Incoming > | -> HyperResponse {
360359 let retry_after_value = retry_after_value. clone ( ) ;
361- let response_body = Body :: from (
360+ let response_body = Bytes :: from (
362361 r#"{
363362 "jsonrpc": "2.0",
364363 "error": {
@@ -373,14 +372,14 @@ mod tests {
373372 response = response. header ( "Retry-After" , value)
374373 }
375374
376- let response = response. body ( response_body) . unwrap ( ) ;
375+ let response = response. body ( Full :: new ( response_body) ) . unwrap ( ) ;
377376 Box :: pin ( async move { Ok ( response) } )
378377 } )
379378 }
380379
381380 fn return_5xx ( code : u16 ) -> HyperHandler {
382- Box :: new ( move |_req : Request < Body > | -> HyperResponse {
383- let response_body = Body :: from (
381+ Box :: new ( move |_req : Request < Incoming > | -> HyperResponse {
382+ let response_body = Bytes :: from (
384383 r#"{
385384 "jsonrpc": "2.0",
386385 "error": {
@@ -390,12 +389,12 @@ mod tests {
390389 }"# ,
391390 ) ;
392391
393- let response = Response :: builder ( ) . status ( code) . body ( response_body) . unwrap ( ) ;
392+ let response = Response :: builder ( ) . status ( code) . body ( Full :: new ( response_body) ) . unwrap ( ) ;
394393 Box :: pin ( async move { Ok ( response) } )
395394 } )
396395 }
397396
398- fn check_and_return_mock_response ( req : Request < Body > ) -> HyperResponse {
397+ fn check_and_return_mock_response ( req : Request < Incoming > ) -> HyperResponse {
399398 let expected = r#"{"jsonrpc":"2.0","method":"eth_getAccounts","params":[],"id":0}"# ;
400399 let response = r#"{"jsonrpc":"2.0","id":0,"result":"x"}"# ;
401400
@@ -405,16 +404,15 @@ mod tests {
405404 let mut body = req. into_body ( ) ;
406405
407406 Box :: pin ( async move {
408- while let Some ( Ok ( chunk) ) = body. data ( ) . await {
409- content. extend ( & * chunk) ;
407+ while let Some ( Ok ( chunk) ) = body. frame ( ) . await {
408+ content. extend ( chunk. into_data ( ) . unwrap ( ) ) ;
410409 }
411- assert_eq ! ( std:: str :: from_utf8( & * content) , Ok ( expected) ) ;
412-
413- Ok ( Response :: new ( response. into ( ) ) )
410+ assert_eq ! ( std:: str :: from_utf8( & content) , Ok ( expected) ) ;
411+ Response :: builder ( ) . status ( 200 ) . body ( Full :: new ( response. into ( ) ) )
414412 } )
415413 }
416414
417- fn return_error_response ( _req : Request < Body > ) -> HyperResponse {
415+ fn return_error_response ( _req : Request < Incoming > ) -> HyperResponse {
418416 let response = r#"{
419417 "jsonrpc":"2.0",
420418 "error":{
@@ -423,12 +421,16 @@ mod tests {
423421 },
424422 "id":null
425423 }"# ;
426- Box :: pin ( async move { Ok ( Response :: new ( response. into ( ) ) ) } )
424+ let response = Response :: builder ( )
425+ . status ( 200 )
426+ . body ( Full :: new ( response. into ( ) ) )
427+ . unwrap ( ) ;
428+ Box :: pin ( async move { Ok ( response) } )
427429 }
428430
429431 fn return_sequence ( handlers : Vec < HyperHandler > ) -> HyperHandler {
430432 let handlers = Arc :: new ( Mutex :: new ( handlers) ) ;
431- Box :: new ( move |_req : Request < Body > | -> HyperResponse {
433+ Box :: new ( move |_req : Request < Incoming > | -> HyperResponse {
432434 let handlers = handlers. clone ( ) ;
433435 Box :: pin ( async move {
434436 let mut handlers = handlers. lock ( ) . await ;
@@ -441,8 +443,8 @@ mod tests {
441443 #[ tokio:: test]
442444 async fn should_make_a_request ( ) {
443445 // given
444- let port = get_available_port ( ) . unwrap ( ) ;
445- let _ = create_server ( port, Box :: new ( check_and_return_mock_response) ) ;
446+ let port = get_available_port ( ) . await . unwrap ( ) ;
447+ let _ = create_server ( port, Box :: new ( check_and_return_mock_response) ) . await ;
446448 let client = create_client ( port, Retries :: default ( ) ) ;
447449
448450 // when
@@ -457,8 +459,8 @@ mod tests {
457459 #[ tokio:: test]
458460 async fn catch_generic_json_error_for_batched_request ( ) {
459461 // given
460- let port = get_available_port ( ) . unwrap ( ) ;
461- let _ = create_server ( port, Box :: new ( return_error_response) ) ;
462+ let port = get_available_port ( ) . await . unwrap ( ) ;
463+ let _ = create_server ( port, Box :: new ( return_error_response) ) . await ;
462464 let client = create_client ( port, Retries :: default ( ) ) ;
463465
464466 // when
@@ -505,14 +507,15 @@ mod tests {
505507 #[ tokio:: test]
506508 async fn status_code_429_with_retry_after_as_seconds ( ) {
507509 // given
508- let port = get_available_port ( ) . unwrap ( ) ;
510+ let port = get_available_port ( ) . await . unwrap ( ) ;
509511 let _ = create_server (
510512 port,
511513 return_sequence ( vec ! [
512514 return_429( Some ( "3" . into( ) ) ) ,
513515 Box :: new( check_and_return_mock_response) ,
514516 ] ) ,
515- ) ;
517+ )
518+ . await ;
516519 let client = create_client (
517520 port,
518521 Retries {
@@ -537,7 +540,7 @@ mod tests {
537540 #[ tokio:: test]
538541 async fn status_code_429_with_retry_after_as_date ( ) {
539542 // given
540- let port = get_available_port ( ) . unwrap ( ) ;
543+ let port = get_available_port ( ) . await . unwrap ( ) ;
541544 let started = Instant :: now ( ) ;
542545 let retry_after_value: DateTime < Utc > = DateTime :: from ( Utc :: now ( ) + Duration :: from_secs ( 3 ) ) ;
543546 let _ = create_server (
@@ -546,7 +549,8 @@ mod tests {
546549 return_429( Some ( retry_after_value. to_rfc2822( ) ) ) ,
547550 Box :: new( check_and_return_mock_response) ,
548551 ] ) ,
549- ) ;
552+ )
553+ . await ;
550554 let client = create_client (
551555 port,
552556 Retries {
@@ -570,11 +574,12 @@ mod tests {
570574 #[ tokio:: test]
571575 async fn status_code_429_with_invalid_retry_after ( ) {
572576 // given
573- let port = get_available_port ( ) . unwrap ( ) ;
577+ let port = get_available_port ( ) . await . unwrap ( ) ;
574578 let _ = create_server (
575579 port,
576580 return_sequence ( vec ! [ return_429( Some ( "retry some time later, idc" . into( ) ) ) ] ) ,
577- ) ;
581+ )
582+ . await ;
578583 let client = create_client (
579584 port,
580585 Retries {
@@ -596,8 +601,8 @@ mod tests {
596601 #[ tokio:: test]
597602 async fn status_code_429_without_retry_after ( ) {
598603 // given
599- let port = get_available_port ( ) . unwrap ( ) ;
600- let _ = create_server ( port, return_sequence ( vec ! [ return_429( None ) ] ) ) ;
604+ let port = get_available_port ( ) . await . unwrap ( ) ;
605+ let _ = create_server ( port, return_sequence ( vec ! [ return_429( None ) ] ) ) . await ;
601606 let client = create_client (
602607 port,
603608 Retries {
@@ -619,8 +624,8 @@ mod tests {
619624 #[ tokio:: test]
620625 async fn status_code_429_retry_after_disabled ( ) {
621626 // given
622- let port = get_available_port ( ) . unwrap ( ) ;
623- let _ = create_server ( port, return_sequence ( vec ! [ return_429( Some ( "3" . into( ) ) ) ] ) ) ;
627+ let port = get_available_port ( ) . await . unwrap ( ) ;
628+ let _ = create_server ( port, return_sequence ( vec ! [ return_429( Some ( "3" . into( ) ) ) ] ) ) . await ;
624629 let client = create_client (
625630 port,
626631 Retries {
@@ -642,15 +647,16 @@ mod tests {
642647 #[ tokio:: test]
643648 async fn status_code_429_with_retries ( ) {
644649 // given
645- let port = get_available_port ( ) . unwrap ( ) ;
650+ let port = get_available_port ( ) . await . unwrap ( ) ;
646651 let _ = create_server (
647652 port,
648653 return_sequence ( vec ! [
649654 return_429( Some ( "3" . into( ) ) ) , // sleep for 1 second as configured below
650655 return_429( Some ( "3" . into( ) ) ) , // sleep for 2 seconds (2x 1sec)
651656 Box :: new( check_and_return_mock_response) ,
652657 ] ) ,
653- ) ;
658+ )
659+ . await ;
654660 let client = create_client (
655661 port,
656662 Retries {
@@ -675,15 +681,16 @@ mod tests {
675681 #[ tokio:: test]
676682 async fn status_code_5xx_with_retries ( ) {
677683 // given
678- let port = get_available_port ( ) . unwrap ( ) ;
684+ let port = get_available_port ( ) . await . unwrap ( ) ;
679685 let _ = create_server (
680686 port,
681687 return_sequence ( vec ! [
682688 return_5xx( 500 ) , // sleep for 1 second as configured below
683689 return_5xx( 502 ) , // sleep for 2 seconds (2x 1sec)
684690 Box :: new( check_and_return_mock_response) ,
685691 ] ) ,
686- ) ;
692+ )
693+ . await ;
687694 let client = create_client (
688695 port,
689696 Retries {
@@ -708,7 +715,7 @@ mod tests {
708715 #[ tokio:: test]
709716 async fn status_code_5xx_retries_exhausted ( ) {
710717 // given
711- let port = get_available_port ( ) . unwrap ( ) ;
718+ let port = get_available_port ( ) . await . unwrap ( ) ;
712719 let _ = create_server (
713720 port,
714721 return_sequence ( vec ! [
@@ -717,7 +724,8 @@ mod tests {
717724 return_5xx( 503 ) ,
718725 Box :: new( check_and_return_mock_response) ,
719726 ] ) ,
720- ) ;
727+ )
728+ . await ;
721729 let client = create_client (
722730 port,
723731 Retries {
@@ -739,8 +747,8 @@ mod tests {
739747 #[ tokio:: test]
740748 async fn status_code_5xx_without_retries ( ) {
741749 // given
742- let port = get_available_port ( ) . unwrap ( ) ;
743- let _ = create_server ( port, return_sequence ( vec ! [ return_5xx( 500 ) ] ) ) ;
750+ let port = get_available_port ( ) . await . unwrap ( ) ;
751+ let _ = create_server ( port, return_sequence ( vec ! [ return_5xx( 500 ) ] ) ) . await ;
744752 let client = create_client (
745753 port,
746754 Retries {
0 commit comments