Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(http1): graceful shutdown first byte timeout #3808

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 98 additions & 26 deletions src/proto/h1/conn.rs
Original file line number Diff line number Diff line change
@@ -59,11 +59,9 @@ where
h1_parser_config: ParserConfig::default(),
h1_max_headers: None,
#[cfg(feature = "server")]
h1_header_read_timeout: None,
h1_header_read_timeout: TimeoutState::default(),
#[cfg(feature = "server")]
h1_header_read_timeout_fut: None,
#[cfg(feature = "server")]
h1_header_read_timeout_running: false,
h1_graceful_shutdown_first_byte_read_timeout: TimeoutState::default(),
#[cfg(feature = "server")]
date_header: true,
#[cfg(feature = "server")]
@@ -144,7 +142,14 @@ where

#[cfg(feature = "server")]
pub(crate) fn set_http1_header_read_timeout(&mut self, val: Duration) {
self.state.h1_header_read_timeout = Some(val);
self.state.h1_header_read_timeout.timeout = Some(val);
}

#[cfg(feature = "server")]
pub(crate) fn set_http1_graceful_shutdown_first_byte_read_timeout(&mut self, val: Duration) {
self.state
.h1_graceful_shutdown_first_byte_read_timeout
.timeout = Some(val);
}

#[cfg(feature = "server")]
@@ -209,6 +214,19 @@ where
read_buf.len() >= 24 && read_buf[..24] == *H2_PREFACE
}

fn close_if_inactive(&mut self) {
// When a graceful shutdown is triggered we wait for up to some
// `Duration` to allow for the client to begin transmitting bytes to the
// server.
// If that duration has elapsed and the connection is still idle, or
// no bytes have been received on the connection, then we close it.
// This prevents inactive connections from keeping the server alive
// despite having no intention of sending a request.
if self.is_idle() || self.has_initial_read_write_state() {
self.state.close();
}
}

pub(super) fn poll_read_head(
&mut self,
cx: &mut Context<'_>,
@@ -217,24 +235,50 @@ where
trace!("Conn::read_head");

#[cfg(feature = "server")]
if !self.state.h1_header_read_timeout_running {
if let Some(h1_header_read_timeout) = self.state.h1_header_read_timeout {
if !self.state.h1_header_read_timeout.is_running {
if let Some(h1_header_read_timeout) = self.state.h1_header_read_timeout.timeout {
self.state.h1_header_read_timeout.is_running = true;
let deadline = Instant::now() + h1_header_read_timeout;
self.state.h1_header_read_timeout_running = true;
match self.state.h1_header_read_timeout_fut {
match self.state.h1_header_read_timeout.deadline_fut {
Some(ref mut h1_header_read_timeout_fut) => {
trace!("resetting h1 header read timeout timer");
self.state.timer.reset(h1_header_read_timeout_fut, deadline);
}
None => {
trace!("setting h1 header read timeout timer");
self.state.h1_header_read_timeout_fut =
self.state.h1_header_read_timeout.deadline_fut =
Some(self.state.timer.sleep_until(deadline));
}
}
}
}

#[cfg(feature = "server")]
if !self
.state
.h1_graceful_shutdown_first_byte_read_timeout
.is_running
{
if let Some(h1_graceful_shutdown_timeout) = self
.state
.h1_graceful_shutdown_first_byte_read_timeout
.timeout
{
if h1_graceful_shutdown_timeout == Duration::from_secs(0) {
self.close_if_inactive();
} else {
self.state
.h1_graceful_shutdown_first_byte_read_timeout
.is_running = true;

let deadline = Instant::now() + h1_graceful_shutdown_timeout;
self.state
.h1_graceful_shutdown_first_byte_read_timeout
.deadline_fut = Some(self.state.timer.sleep_until(deadline));
}
}
}

let msg = match self.io.parse::<T>(
cx,
ParseContext {
@@ -254,27 +298,47 @@ where
Poll::Ready(Err(e)) => return self.on_read_head_error(e),
Poll::Pending => {
#[cfg(feature = "server")]
if self.state.h1_header_read_timeout_running {
if self.state.h1_header_read_timeout.is_running {
if let Some(ref mut h1_header_read_timeout_fut) =
self.state.h1_header_read_timeout_fut
self.state.h1_header_read_timeout.deadline_fut
{
if Pin::new(h1_header_read_timeout_fut).poll(cx).is_ready() {
self.state.h1_header_read_timeout_running = false;
self.state.h1_header_read_timeout.is_running = false;

warn!("read header from client timeout");
return Poll::Ready(Some(Err(crate::Error::new_header_timeout())));
}
}
}

#[cfg(feature = "server")]
if self
.state
.h1_graceful_shutdown_first_byte_read_timeout
.is_running
{
if let Some(ref mut h1_graceful_shutdown_timeout_fut) = self
.state
.h1_graceful_shutdown_first_byte_read_timeout
.deadline_fut
{
if Pin::new(h1_graceful_shutdown_timeout_fut)
.poll(cx)
.is_ready()
{
self.close_if_inactive();
}
}
}

return Poll::Pending;
}
};

#[cfg(feature = "server")]
{
self.state.h1_header_read_timeout_running = false;
self.state.h1_header_read_timeout_fut = None;
self.state.h1_header_read_timeout.is_running = false;
self.state.h1_header_read_timeout.deadline_fut = None;
}

// Note: don't deconstruct `msg` into local variables, it appears
@@ -872,15 +936,15 @@ where
self.state.close_write();
}

#[cfg(feature = "server")]
pub(crate) fn is_idle(&mut self) -> bool {
self.state.is_idle()
}

#[cfg(feature = "server")]
pub(crate) fn disable_keep_alive(&mut self) {
if self.state.is_idle() {
trace!("disable_keep_alive; closing idle connection");
self.state.close();
} else {
trace!("disable_keep_alive; in-progress connection");
self.state.disable_keep_alive();
}
trace!("disable_keep_alive");
self.state.disable_keep_alive();
}

pub(crate) fn take_error(&mut self) -> crate::Result<()> {
@@ -926,11 +990,11 @@ struct State {
h1_parser_config: ParserConfig,
h1_max_headers: Option<usize>,
#[cfg(feature = "server")]
h1_header_read_timeout: Option<Duration>,
h1_header_read_timeout: TimeoutState,
/// If a graceful shutdown is initiated, and the `TimeoutState` duration has elapsed without
/// receiving any bytes from the client, the connection will be closed.
#[cfg(feature = "server")]
h1_header_read_timeout_fut: Option<Pin<Box<dyn Sleep>>>,
#[cfg(feature = "server")]
h1_header_read_timeout_running: bool,
h1_graceful_shutdown_first_byte_read_timeout: TimeoutState,
#[cfg(feature = "server")]
date_header: bool,
#[cfg(feature = "server")]
@@ -1144,6 +1208,14 @@ impl State {
}
}

#[derive(Default)]
#[cfg(feature = "server")]
struct TimeoutState {
timeout: Option<Duration>,
deadline_fut: Option<Pin<Box<dyn Sleep>>>,
is_running: bool,
}

#[cfg(test)]
mod tests {
#[cfg(all(feature = "nightly", not(miri)))]
21 changes: 10 additions & 11 deletions src/proto/h1/dispatch.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
use crate::rt::{Read, Write};
use bytes::{Buf, Bytes};
use futures_util::ready;
use http::Request;
use std::time::Duration;
use std::{
error::Error as StdError,
future::Future,
@@ -6,11 +11,6 @@ use std::{
task::{Context, Poll},
};

use crate::rt::{Read, Write};
use bytes::{Buf, Bytes};
use futures_util::ready;
use http::Request;

use super::{Http1Transaction, Wants};
use crate::body::{Body, DecodedLength, Incoming as IncomingBody};
#[cfg(feature = "client")]
@@ -90,13 +90,12 @@ where
#[cfg(feature = "server")]
pub(crate) fn disable_keep_alive(&mut self) {
self.conn.disable_keep_alive();
}

// If keep alive has been disabled and no read or write has been seen on
// the connection yet, we must be in a state where the server is being asked to
// shut down before any data has been seen on the connection
if self.conn.is_write_closed() || self.conn.has_initial_read_write_state() {
self.close();
}
#[cfg(feature = "server")]
pub(crate) fn set_graceful_shutdown_first_byte_read_timeout(&mut self, read_timeout: Duration) {
self.conn
.set_http1_graceful_shutdown_first_byte_read_timeout(read_timeout);
}

pub(crate) fn into_inner(self) -> (I, Bytes, D) {
96 changes: 94 additions & 2 deletions src/server/conn/http1.rs
Original file line number Diff line number Diff line change
@@ -123,7 +123,8 @@ where
B: Body + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
{
/// Start a graceful shutdown process for this connection.
/// Start a graceful shutdown process for this connection, using the default
/// [`GracefulShutdownConfig`].
///
/// This `Connection` should continue to be polled until shutdown
/// can finish.
@@ -133,8 +134,29 @@ where
/// This should only be called while the `Connection` future is still
/// pending. If called after `Connection::poll` has resolved, this does
/// nothing.
pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
pub fn graceful_shutdown(self: Pin<&mut Self>) {
self.graceful_shutdown_with_config(GracefulShutdownConfig::default());
}

/// Start a graceful shutdown process for this connection.
///
/// This `Connection` should continue to be polled until shutdown can finish.
///
/// Requires a [`Timer`] set by [`Builder::timer`].
///
/// # Note
///
/// This should only be called while the `Connection` future is still
/// pending. If called after `Connection::poll` has resolved, this does
/// nothing.
///
/// # Panics
/// If [`GracefulShutdownConfig::first_byte_read_timeout`] was configured to greater than zero
/// nanoseconds, but no timer was set, then the `Connection` will panic when it is next polled.
pub fn graceful_shutdown_with_config(mut self: Pin<&mut Self>, config: GracefulShutdownConfig) {
self.conn.disable_keep_alive();
self.conn
.set_graceful_shutdown_first_byte_read_timeout(config.first_byte_read_timeout);
}

/// Return the inner IO object, and additional information.
@@ -526,3 +548,73 @@ where
}
}
}

/// Configuration for graceful shutdowns.
///
/// # Example
///
/// ```
/// # use hyper::{body::Incoming, Request, Response};
/// # use hyper::service::Service;
/// # use hyper::server::conn::http1::Builder;
/// # use hyper::rt::{Read, Write};
/// # use std::time::Duration;
/// # use hyper::server::conn::http1::GracefulShutdownConfig;
/// # async fn run<I, S>(some_io: I, some_service: S)
/// # where
/// # I: Read + Write + Unpin + Send + 'static,
/// # S: Service<hyper::Request<Incoming>, Response=hyper::Response<Incoming>> + Send + 'static,
/// # S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
/// # S::Future: Send,
/// # {
/// let http = Builder::new();
/// let conn = http.serve_connection(some_io, some_service);
///
/// let mut config = GracefulShutdownConfig::default();
/// config.first_byte_read_timeout(Duration::from_secs(2));
///
/// conn.graceful_shutdown_with_config(config);
/// conn.await.unwrap();
/// # }
/// # fn main() {}
/// ```
#[derive(Debug)]
pub struct GracefulShutdownConfig {
first_byte_read_timeout: Duration,
}
impl Default for GracefulShutdownConfig {
fn default() -> Self {
GracefulShutdownConfig {
first_byte_read_timeout: Duration::from_secs(0),
}
}
}
impl GracefulShutdownConfig {
/// It is possible for a client to open a connection and begin transmitting bytes, but have the
/// server initiate a graceful shutdown just before it sees any of the client's bytes.
///
/// The more traffic that a server receives, the more likely this race condition is to occur for
/// some of the open connections.
///
/// The `first_byte_read_timeout` controls how long the server waits for the first bytes of a
/// final request to be received from the client.
///
/// If no bytes were received from the client between the time that keep alive was disabled and
/// the `first_byte_timeout` duration, the connection is considered inactive and the server will
/// close it.
///
/// # Recommendations
/// Servers are recommended to use a `first_byte_read_timeout` that reduces the likelihood of
/// the client receiving an error due to the connection closing just after they began
/// transmitting their final request.
/// For most internet connections, a roughly one second timeout should be enough time for the
/// server to begin receiving the client's request's bytes.
///
/// # Default
/// A default of 0 seconds was chosen to remain backwards compatible with version of hyper that
/// did not have this `first_byte_read_timeout` configuration.
pub fn first_byte_read_timeout(&mut self, timeout: Duration) -> &mut Self {
self.first_byte_read_timeout = timeout;
self
}
}
93 changes: 93 additions & 0 deletions tests/server.rs
Original file line number Diff line number Diff line change
@@ -27,6 +27,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener as TkTcpListener, TcpListener, TcpStream as TkTcpStream};

use hyper::body::{Body, Incoming as IncomingBody};
use hyper::server::conn::http1::GracefulShutdownConfig;
use hyper::server::conn::{http1, http2};
use hyper::service::{service_fn, Service};
use hyper::{Method, Request, Response, StatusCode, Uri, Version};
@@ -1348,6 +1349,38 @@ async fn http1_graceful_shutdown_after_upgrade() {
conn.as_mut().graceful_shutdown();
}

/// When hyper reached 1.0 the `Connection::graceful_shutdown` did not require a timer.
/// It would immediately close connections that were idle or had not received any bytes.
/// A later release made it possible to wait some Duration before closing the inactive connection.
/// Here we confirm that `Connection::graceful_shutdown` does not require a timer.
#[tokio::test]
async fn http1_graceful_shutdown_no_timer_required_for_zero_second_next_byte_timeout() {
let (listener, addr) = setup_tcp_listener();

tokio::spawn(async move {
let mut stream = TkTcpStream::connect(addr).await.unwrap();

stream.write_all(b"GET / HTTP/1.1\r\n\r\n").await.unwrap();

let mut buf = vec![];
stream.read_to_end(&mut buf).await.unwrap();
});

let socket = listener.accept().await.unwrap().0;
let socket = TokioIo::new(socket);

// Construct a builder that does not call the `.timer` method.
let future = http1::Builder::new().serve_connection(socket, HelloWorld);
pin!(future);

future.as_mut().graceful_shutdown();

tokio::time::timeout(Duration::from_secs(5), future)
.await
.unwrap()
.unwrap();
}

#[tokio::test]
async fn empty_parse_eof_does_not_return_error() {
let (listener, addr) = setup_tcp_listener();
@@ -2352,6 +2385,66 @@ async fn graceful_shutdown_before_first_request_no_block() {
.expect("error receiving response");
}

#[cfg(feature = "http1")]
#[tokio::test]
async fn graceful_shutdown_grace_period_for_first_byte() {
// (Client wait before sending, Server first byte timeout, Expected completed response)
let test_cases = [
// When the client sends bytes before the grace period we expect a response.
(500, 1000, true),
// When the client sends bytes after the grace period we do not expect a response.
(1000, 500, false),
];
for (client_wait_before_sending, server_first_byte_timeout, expected_to_respond) in test_cases {
let (listener, addr) = setup_tcp_listener();

let graceful_shutdown_first_byte_timeout = Duration::from_millis(server_first_byte_timeout);
let client_wait_before_sending = Duration::from_millis(client_wait_before_sending);

tokio::spawn(async move {
let socket = listener.accept().await.unwrap().0;
let socket = TokioIo::new(socket);

let future = http1::Builder::new()
.timer(TokioTimer)
.serve_connection(socket, HelloWorld);
pin!(future);

let mut graceful_config = GracefulShutdownConfig::default();
graceful_config.first_byte_read_timeout(graceful_shutdown_first_byte_timeout);
future
.as_mut()
.graceful_shutdown_with_config(graceful_config);

future.await.unwrap();
});

let mut stream = TkTcpStream::connect(addr).await.unwrap();

tokio::time::sleep(client_wait_before_sending).await;
stream.write_all(b"GET / HTTP/1.1\r\n\r\n").await.unwrap();

let mut buf = vec![];
stream.read_to_end(&mut buf).await.unwrap();

if expected_to_respond {
assert!(buf.starts_with(b"HTTP/1.1 200 OK\r\nconnection: close"));
} else {
assert!(buf.is_empty());
}

// Since the server was gracefully shut down it should not respond to any further requests.
{
stream.write_all(b"GET / HTTP/1.1\r\n\r\n").await.unwrap();

let mut buf = vec![];
stream.read_to_end(&mut buf).await.unwrap();

assert_eq!(buf.len(), 0);
}
}
}

#[test]
fn streaming_body() {
use futures_util::StreamExt;