@@ -22,6 +22,7 @@ use std::sync::LazyLock;
22
22
use std:: task:: { Context , Poll } ;
23
23
use std:: time:: Duration ;
24
24
25
+ use cfg_or_panic:: cfg_or_panic;
25
26
use futures:: FutureExt ;
26
27
use http:: Uri ;
27
28
use hyper_util:: client:: legacy:: connect:: dns:: { GaiAddrs , GaiFuture , GaiResolver , Name } ;
@@ -591,23 +592,40 @@ impl Service<Name> for MonitoredGaiResolver {
591
592
}
592
593
}
593
594
595
+ #[ cfg_or_panic( not( madsim) ) ]
596
+ fn monitored_http_connector (
597
+ connection_type : impl Into < String > ,
598
+ config : TcpConfig ,
599
+ ) -> MonitoredConnection < HttpConnector < MonitoredGaiResolver > , MonitorNewConnectionImpl > {
600
+ let resolver = MonitoredGaiResolver :: default ( ) ;
601
+ let mut http = HttpConnector :: new_with_resolver ( resolver) ;
602
+
603
+ http. enforce_http ( false ) ;
604
+ http. set_nodelay ( config. tcp_nodelay ) ;
605
+ http. set_keepalive ( config. keepalive_duration ) ;
606
+
607
+ monitor_connector ( http, connection_type)
608
+ }
609
+
610
+ /// Attach general configurations to the endpoint.
611
+ #[ cfg_or_panic( not( madsim) ) ]
612
+ fn configure_endpoint ( endpoint : Endpoint ) -> Endpoint {
613
+ // This is to mitigate https://github.com/risingwavelabs/risingwave/issues/18039.
614
+ // TODO: remove this after https://github.com/hyperium/hyper/issues/3724 gets resolved.
615
+ endpoint. http2_max_header_list_size ( 16 * 1024 * 1024 )
616
+ }
617
+
594
618
#[ easy_ext:: ext( EndpointExt ) ]
595
619
impl Endpoint {
596
620
pub async fn monitored_connect (
597
- self ,
621
+ mut self ,
598
622
connection_type : impl Into < String > ,
599
623
config : TcpConfig ,
600
624
) -> Result < Channel , tonic:: transport:: Error > {
601
625
#[ cfg( not( madsim) ) ]
602
626
{
603
- let resolver = MonitoredGaiResolver :: default ( ) ;
604
- let mut http = HttpConnector :: new_with_resolver ( resolver) ;
605
-
606
- http. enforce_http ( false ) ;
607
- http. set_nodelay ( config. tcp_nodelay ) ;
608
- http. set_keepalive ( config. keepalive_duration ) ;
609
-
610
- let connector = monitor_connector ( http, connection_type) ;
627
+ self = configure_endpoint ( self ) ;
628
+ let connector = monitored_http_connector ( connection_type, config) ;
611
629
self . connect_with_connector ( connector) . await
612
630
}
613
631
#[ cfg( madsim) ]
@@ -618,16 +636,12 @@ impl Endpoint {
618
636
619
637
#[ cfg( not( madsim) ) ]
620
638
pub fn monitored_connect_lazy (
621
- self ,
639
+ mut self ,
622
640
connection_type : impl Into < String > ,
623
641
config : TcpConfig ,
624
642
) -> Channel {
625
- let mut http = HttpConnector :: new ( ) ;
626
- http. enforce_http ( false ) ;
627
- http. set_nodelay ( config. tcp_nodelay ) ;
628
- http. set_keepalive ( config. keepalive_duration ) ;
629
-
630
- let connector = monitor_connector ( http, connection_type) ;
643
+ self = configure_endpoint ( self ) ;
644
+ let connector = monitored_http_connector ( connection_type, config) ;
631
645
self . connect_with_connector_lazy ( connector)
632
646
}
633
647
}
0 commit comments