Skip to content

Commit 5f77a6b

Browse files
author
silence
committed
add conn idle timeout
1 parent faf24c6 commit 5f77a6b

File tree

3 files changed

+55
-6
lines changed

3 files changed

+55
-6
lines changed

src/proto/h1/conn.rs

+4
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,10 @@ where
154154
self.state.is_write_closed()
155155
}
156156

157+
pub(crate) fn is_idle(&self) -> bool {
158+
self.state.is_idle()
159+
}
160+
157161
pub(crate) fn can_read_head(&self) -> bool {
158162
if !matches!(self.state.reading, Reading::Init) {
159163
return false;

src/proto/h1/dispatch.rs

+17-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
use std::error::Error as StdError;
2+
use std::time::Duration;
23

34
use bytes::{Buf, Bytes};
45
use http::Request;
56
use tokio::io::{AsyncRead, AsyncWrite};
7+
use tokio::time::Sleep;
68
use tracing::{debug, trace};
79

810
use super::{Http1Transaction, Wants};
@@ -12,13 +14,15 @@ use crate::proto::{
1214
BodyLength, Conn, Dispatched, MessageHead, RequestHead,
1315
};
1416
use crate::upgrade::OnUpgrade;
15-
17+
// TODO will be replace by config in the formal pull request
18+
const DEFAULT_CONN_IDLE_TIMEOUT: Duration = Duration::from_secs(65);
1619
pub(crate) struct Dispatcher<D, Bs: HttpBody, I, T> {
1720
conn: Conn<I, Bs::Data, T>,
1821
dispatch: D,
1922
body_tx: Option<crate::body::Sender>,
2023
body_rx: Pin<Box<Option<Bs>>>,
2124
is_closing: bool,
25+
conn_idle_timeout: Pin<Box<Sleep>>,
2226
}
2327

2428
pub(crate) trait Dispatch {
@@ -77,6 +81,7 @@ where
7781
body_tx: None,
7882
body_rx: Box::pin(None),
7983
is_closing: false,
84+
conn_idle_timeout: Box::pin(tokio::time::sleep(DEFAULT_CONN_IDLE_TIMEOUT)),
8085
}
8186
}
8287

@@ -156,11 +161,18 @@ where
156161
//
157162
// 16 was chosen arbitrarily, as that is number of pipelined requests
158163
// benchmarks often use. Perhaps it should be a config option instead.
164+
if self.conn_idle_timeout.as_mut().poll(cx).is_ready() {
165+
trace!("conn is close");
166+
self.close();
167+
return Poll::Ready(Ok(()));
168+
}
159169
for _ in 0..16 {
160170
let _ = self.poll_read(cx)?;
161171
let _ = self.poll_write(cx)?;
162172
let _ = self.poll_flush(cx)?;
163-
173+
if self.conn.is_idle() {
174+
self.conn_idle_timeout.as_mut().reset(tokio::time::Instant::now() + DEFAULT_CONN_IDLE_TIMEOUT);
175+
}
164176
// This could happen if reading paused before blocking on IO,
165177
// such as getting to the end of a framed message, but then
166178
// writing/flushing set the state back to Init. In that case,
@@ -247,6 +259,9 @@ where
247259
// dispatch is ready for a message, try to read one
248260
match ready!(self.conn.poll_read_head(cx)) {
249261
Some(Ok((mut head, body_len, wants))) => {
262+
trace!("req is comming");
263+
// Avoid request failures due to timeout.
264+
self.conn_idle_timeout.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(60 * 60 * 24));
250265
let body = match body_len {
251266
DecodedLength::ZERO => Body::empty(),
252267
other => {

src/proto/h2/server.rs

+34-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::error::Error as StdError;
22
use std::marker::Unpin;
3+
use std::sync::{Arc, Mutex};
34
#[cfg(feature = "runtime")]
45
use std::time::Duration;
56

@@ -24,7 +25,7 @@ use crate::service::HttpService;
2425

2526
use crate::upgrade::{OnUpgrade, Pending, Upgraded};
2627
use crate::{Body, Response};
27-
28+
use tokio::time::Sleep;
2829
// Our defaults are chosen for the "majority" case, which usually are not
2930
// resource constrained, and so the spec default of 64kb can be too limiting
3031
// for performance.
@@ -36,6 +37,9 @@ const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024; // 1mb
3637
const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; // 16kb
3738
const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 400; // 400kb
3839

40+
// TODO will be replace by config in the formal pull request
41+
const DEFAULT_CONN_IDLE_TIMEOUT: Duration = Duration::from_secs(65);
42+
3943
#[derive(Clone, Debug)]
4044
pub(crate) struct Config {
4145
pub(crate) adaptive_window: bool,
@@ -100,6 +104,7 @@ where
100104
ping: Option<(ping::Recorder, ping::Ponger)>,
101105
conn: Connection<T, SendBuf<B::Data>>,
102106
closing: Option<crate::Error>,
107+
conn_idle_timeout: Arc<Mutex<Pin<Box<Sleep>>>>
103108
}
104109

105110
impl<T, S, B, E> Server<T, S, B, E>
@@ -202,6 +207,7 @@ where
202207
ping,
203208
conn,
204209
closing: None,
210+
conn_idle_timeout: Arc::new(Mutex::new(Box::pin(tokio::time::sleep(DEFAULT_CONN_IDLE_TIMEOUT))))
205211
})
206212
}
207213
State::Serving(ref mut srv) => {
@@ -224,6 +230,17 @@ where
224230
T: AsyncRead + AsyncWrite + Unpin,
225231
B: HttpBody + 'static,
226232
{
233+
234+
fn reset_conn_idle_timeout(&self, time: Duration) {
235+
let mut sleep = self.conn_idle_timeout.lock().unwrap();
236+
sleep.as_mut().reset(tokio::time::Instant::now() + time);
237+
}
238+
239+
fn poll_conn_idle_timeout(&self, cx: &mut task::Context<'_>) -> bool {
240+
let mut sleep = self.conn_idle_timeout.lock().unwrap();
241+
sleep.as_mut().poll(cx).is_ready()
242+
}
243+
227244
fn poll_server<S, E>(
228245
&mut self,
229246
cx: &mut task::Context<'_>,
@@ -269,10 +286,16 @@ where
269286
break;
270287
}
271288
}
272-
289+
if self.poll_conn_idle_timeout(cx) {
290+
self.conn.graceful_shutdown();
291+
trace!("conn is close with idle");
292+
return Poll::Ready(Ok(()));
293+
}
273294
// When the service is ready, accepts an incoming request.
274295
match ready!(self.conn.poll_accept(cx)) {
275296
Some(Ok((req, mut respond))) => {
297+
// Avoid request failures due to timeout.
298+
self.reset_conn_idle_timeout(Duration::from_secs(60 * 60 * 24));
276299
trace!("incoming request");
277300
let content_length = headers::content_length_parse_all(req.headers());
278301
let ping = self
@@ -317,7 +340,7 @@ where
317340
req.extensions_mut().insert(Protocol::from_inner(protocol));
318341
}
319342

320-
let fut = H2Stream::new(service.call(req), connect_parts, respond);
343+
let fut = H2Stream::new(service.call(req), connect_parts, respond, self.conn_idle_timeout.clone());
321344
exec.execute_h2stream(fut);
322345
}
323346
Some(Err(e)) => {
@@ -373,6 +396,7 @@ pin_project! {
373396
reply: SendResponse<SendBuf<B::Data>>,
374397
#[pin]
375398
state: H2StreamState<F, B>,
399+
conn_idle_timeout: Arc<Mutex<Pin<Box<Sleep>>>>,
376400
}
377401
}
378402

@@ -408,10 +432,12 @@ where
408432
fut: F,
409433
connect_parts: Option<ConnectParts>,
410434
respond: SendResponse<SendBuf<B::Data>>,
435+
conn_idle_timeout: Arc<Mutex<Pin<Box<Sleep>>>>
411436
) -> H2Stream<F, B> {
412437
H2Stream {
413438
reply: respond,
414439
state: H2StreamState::Service { fut, connect_parts },
440+
conn_idle_timeout,
415441
}
416442
}
417443
}
@@ -534,7 +560,11 @@ where
534560
type Output = ();
535561

536562
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
537-
self.poll2(cx).map(|res| {
563+
let conn_idle_timeout = self.conn_idle_timeout.clone();
564+
let poll = self.poll2(cx);
565+
poll.map(|res| {
566+
let mut sleep = conn_idle_timeout.lock().unwrap();
567+
sleep.as_mut().reset(tokio::time::Instant::now() + DEFAULT_CONN_IDLE_TIMEOUT);
538568
if let Err(e) = res {
539569
debug!("stream error: {}", e);
540570
}

0 commit comments

Comments
 (0)