Skip to content

Commit 01c055d

Browse files
committed
Use socket2 to support SO_NODELAY and SO_KEEPALIVE on incoming connections
1 parent 1091bfa commit 01c055d

File tree

3 files changed

+44
-14
lines changed

3 files changed

+44
-14
lines changed

tonic/Cargo.toml

+3-1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ transport = [
4949
"tokio?/net",
5050
"tokio?/time",
5151
"dep:hyper-util",
52+
"dep:socket2",
5253
"dep:tower",
5354
"dep:hyper-timeout",
5455
]
@@ -81,10 +82,12 @@ prost = { version = "0.12", default-features = false, features = [
8182
async-trait = { version = "0.1.13", optional = true }
8283

8384
# transport
85+
axum = { version = "0.7", default_features = false, optional = true }
8486
h2 = { version = "0.4", optional = true }
8587
hyper = { version = "1.0", features = ["full"], optional = true }
8688
hyper-util = { version = "0.1", features = ["full"], optional = true }
8789
hyper-timeout = { version = "0.5", optional = true }
90+
socket2 = { version = ">=0.4.7, <0.6.0", optional = true, features = ["all"] }
8891
tokio-stream = { version = "0.1", features = ["net"] }
8992
tower = { version = "0.4.7", default-features = false, features = [
9093
"balance",
@@ -96,7 +99,6 @@ tower = { version = "0.4.7", default-features = false, features = [
9699
"timeout",
97100
"util",
98101
], optional = true }
99-
axum = { version = "0.7", default_features = false, optional = true }
100102

101103
# rustls
102104
async-stream = { version = "0.3", optional = true }

tonic/src/transport/server/incoming.rs

+31-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::transport::service::ServerIo;
33
use std::{
44
net::{SocketAddr, TcpListener as StdTcpListener},
55
pin::{pin, Pin},
6-
task::{Context, Poll},
6+
task::{ready, Context, Poll},
77
time::Duration,
88
};
99
use tokio::{
@@ -12,6 +12,7 @@ use tokio::{
1212
};
1313
use tokio_stream::wrappers::TcpListenerStream;
1414
use tokio_stream::{Stream, StreamExt};
15+
use tracing::warn;
1516

1617
#[cfg(not(feature = "tls"))]
1718
pub(crate) fn tcp_incoming<IO, IE, L>(
@@ -195,7 +196,35 @@ impl Stream for TcpIncoming {
195196
type Item = Result<TcpStream, std::io::Error>;
196197

197198
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
198-
Pin::new(&mut self.inner).poll_next(cx)
199+
match ready!(Pin::new(&mut self.inner).poll_next(cx)) {
200+
Some(Ok(stream)) => {
201+
set_accept_socketoptions(&stream, self.tcp_nodelay, self.tcp_keepalive_timeout);
202+
Some(Ok(stream)).into()
203+
}
204+
other => Poll::Ready(other),
205+
}
206+
}
207+
}
208+
209+
// Consistent with hyper-0.14, this function does not return an error.
210+
fn set_accept_socketoptions(
211+
stream: &TcpStream,
212+
tcp_nodelay: bool,
213+
tcp_keepalive_timeout: Option<Duration>,
214+
) {
215+
if tcp_nodelay {
216+
if let Err(e) = stream.set_nodelay(true) {
217+
warn!("error trying to set TCP nodelay: {}", e);
218+
}
219+
}
220+
221+
if let Some(timeout) = tcp_keepalive_timeout {
222+
let sock_ref = socket2::SockRef::from(&stream);
223+
let sock_keepalive = socket2::TcpKeepalive::new().with_time(timeout);
224+
225+
if let Err(e) = sock_ref.set_tcp_keepalive(&sock_keepalive) {
226+
warn!("error trying to set TCP keepalive: {}", e);
227+
}
199228
}
200229
}
201230

tonic/src/transport/server/mod.rs

+10-11
Original file line numberDiff line numberDiff line change
@@ -523,16 +523,17 @@ impl<L> Server<L> {
523523
let timeout = self.timeout;
524524
let max_frame_size = self.max_frame_size;
525525

526-
// FIXME: this requires additonal implementation here.
527-
let http2_only = !self.accept_http1;
526+
// TODO: Reqiures support from hyper-util
527+
let _http2_only = !self.accept_http1;
528528

529529
let http2_keepalive_interval = self.http2_keepalive_interval;
530530
let http2_keepalive_timeout = self
531531
.http2_keepalive_timeout
532532
.unwrap_or_else(|| Duration::new(DEFAULT_HTTP2_KEEPALIVE_TIMEOUT_SECS, 0));
533533
let http2_adaptive_window = self.http2_adaptive_window;
534534

535-
let http2_max_pending_accept_reset_streams = self.http2_max_pending_accept_reset_streams;
535+
// TODO: Requires a new release of hyper and hyper-util
536+
let _http2_max_pending_accept_reset_streams = self.http2_max_pending_accept_reset_streams;
536537

537538
let make_service = self.service_builder.service(svc);
538539

@@ -547,6 +548,9 @@ impl<L> Server<L> {
547548

548549
let mut builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
549550

551+
//TODO: Set http2-only when available in hyper_util
552+
//builder.http2_only(http2_only);
553+
550554
builder
551555
.http2()
552556
.initial_connection_window_size(init_connection_window_size)
@@ -555,8 +559,8 @@ impl<L> Server<L> {
555559
.keep_alive_interval(http2_keepalive_interval)
556560
.keep_alive_timeout(http2_keepalive_timeout)
557561
.adaptive_window(http2_adaptive_window.unwrap_or_default())
558-
// FIXME: wait for this to be added to hyper-util
559-
// .max_pending_accept_reset_streams(http2_max_pending_accept_reset_streams)
562+
// TODO: wait for this to be added to hyper-util
563+
//.max_pending_accept_reset_streams(http2_max_pending_accept_reset_streams)
560564
.max_frame_size(max_frame_size);
561565

562566
let (signal_tx, signal_rx) = tokio::sync::watch::channel(());
@@ -1068,18 +1072,13 @@ where
10681072
}
10691073
}
10701074

1075+
// A future which only yields `Poll::Ready` once, and thereafter yields `Poll::Pending`.
10711076
#[pin_project]
10721077
struct Fuse<F> {
10731078
#[pin]
10741079
inner: Option<F>,
10751080
}
10761081

1077-
impl<F> Fuse<F> {
1078-
fn is_terminated(self: &Pin<&mut Self>) -> bool {
1079-
self.inner.is_none()
1080-
}
1081-
}
1082-
10831082
impl<F> Future for Fuse<F>
10841083
where
10851084
F: Future,

0 commit comments

Comments
 (0)