Skip to content

Commit 77fc762

Browse files
author
silence
committed
add conn idle timeout
1 parent faf24c6 commit 77fc762

File tree

4 files changed

+61
-9
lines changed

4 files changed

+61
-9
lines changed

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ pin-project-lite = "0.2.4"
3535
tower-service = "0.3"
3636
tokio = { version = "1", features = ["sync"] }
3737
want = "0.3"
38-
38+
log = "0.4.14"
3939
# Optional
4040

4141
libc = { version = "0.2", optional = true }

src/proto/h1/conn.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,10 @@ where
154154
self.state.is_write_closed()
155155
}
156156

157+
pub fn is_idle(&self) -> bool {
158+
self.state.is_idle()
159+
}
160+
157161
pub(crate) fn can_read_head(&self) -> bool {
158162
if !matches!(self.state.reading, Reading::Init) {
159163
return false;
@@ -736,11 +740,11 @@ where
736740
pub(crate) fn poll_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
737741
match ready!(Pin::new(self.io.io_mut()).poll_shutdown(cx)) {
738742
Ok(()) => {
739-
trace!("shut down IO complete");
743+
log::trace!("shut down IO complete");
740744
Poll::Ready(Ok(()))
741745
}
742746
Err(e) => {
743-
debug!("error shutting down IO: {}", e);
747+
log::debug!("error shutting down IO: {}", e);
744748
Poll::Ready(Err(e))
745749
}
746750
}

src/proto/h1/dispatch.rs

+19-1
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
use std::error::Error as StdError;
2+
use std::time::Duration;
23

34
use bytes::{Buf, Bytes};
45
use http::Request;
56
use tokio::io::{AsyncRead, AsyncWrite};
7+
use tokio::time::Sleep;
68
use tracing::{debug, trace};
79

810
use super::{Http1Transaction, Wants};
@@ -19,6 +21,7 @@ pub(crate) struct Dispatcher<D, Bs: HttpBody, I, T> {
1921
body_tx: Option<crate::body::Sender>,
2022
body_rx: Pin<Box<Option<Bs>>>,
2123
is_closing: bool,
24+
idle_timeout_fut: Option<Pin<Box<Sleep>>>,
2225
}
2326

2427
pub(crate) trait Dispatch {
@@ -77,6 +80,7 @@ where
7780
body_tx: None,
7881
body_rx: Box::pin(None),
7982
is_closing: false,
83+
idle_timeout_fut: None
8084
}
8185
}
8286

@@ -157,10 +161,22 @@ where
157161
// 16 was chosen arbitrarily, as that is number of pipelined requests
158162
// benchmarks often use. Perhaps it should be a config option instead.
159163
for _ in 0..16 {
164+
if let Some(time) = &mut self.idle_timeout_fut {
165+
if time.as_mut().poll(cx).is_ready() {
166+
log::info!("conn is close");
167+
self.close();
168+
return Poll::Ready(Ok(()));
169+
}
170+
}
160171
let _ = self.poll_read(cx)?;
161172
let _ = self.poll_write(cx)?;
162173
let _ = self.poll_flush(cx)?;
163-
174+
if self.conn.is_idle() {
175+
let mut idle_timeout = Box::pin(tokio::time::sleep(Duration::from_secs(10)));
176+
let _ = idle_timeout.as_mut().poll(cx);
177+
self.idle_timeout_fut = Some(idle_timeout);
178+
log::info!("conn is idle");
179+
}
164180
// This could happen if reading paused before blocking on IO,
165181
// such as getting to the end of a framed message, but then
166182
// writing/flushing set the state back to Init. In that case,
@@ -247,6 +263,8 @@ where
247263
// dispatch is ready for a message, try to read one
248264
match ready!(self.conn.poll_read_head(cx)) {
249265
Some(Ok((mut head, body_len, wants))) => {
266+
log::info!("req is comming");
267+
self.idle_timeout_fut = None;
250268
let body = match body_len {
251269
DecodedLength::ZERO => Body::empty(),
252270
other => {

src/proto/h2/server.rs

+35-5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::error::Error as StdError;
22
use std::marker::Unpin;
3+
use std::sync::{Arc, Mutex};
34
#[cfg(feature = "runtime")]
45
use std::time::Duration;
56

@@ -24,7 +25,7 @@ use crate::service::HttpService;
2425

2526
use crate::upgrade::{OnUpgrade, Pending, Upgraded};
2627
use crate::{Body, Response};
27-
28+
use tokio::time::Sleep;
2829
// Our defaults are chosen for the "majority" case, which usually are not
2930
// resource constrained, and so the spec default of 64kb can be too limiting
3031
// for performance.
@@ -100,6 +101,7 @@ where
100101
ping: Option<(ping::Recorder, ping::Ponger)>,
101102
conn: Connection<T, SendBuf<B::Data>>,
102103
closing: Option<crate::Error>,
104+
idle_timeot: Arc<Mutex<Pin<Box<Sleep>>>>
103105
}
104106

105107
impl<T, S, B, E> Server<T, S, B, E>
@@ -202,6 +204,9 @@ where
202204
ping,
203205
conn,
204206
closing: None,
207+
idle_timeot: Arc::new(Mutex::new(Box::pin(
208+
tokio::time::sleep(Duration::from_secs(10)
209+
))))
205210
})
206211
}
207212
State::Serving(ref mut srv) => {
@@ -224,6 +229,16 @@ where
224229
T: AsyncRead + AsyncWrite + Unpin,
225230
B: HttpBody + 'static,
226231
{
232+
233+
fn reset_idle(&self, time: Duration) {
234+
let mut sleep = self.idle_timeot.lock().unwrap();
235+
sleep.as_mut().reset(tokio::time::Instant::now() + time);
236+
}
237+
fn poll_idle(&self, cx: &mut task::Context<'_>)-> bool {
238+
let mut sleep = self.idle_timeot.lock().unwrap();
239+
sleep.as_mut().poll(cx).is_ready()
240+
}
241+
227242
fn poll_server<S, E>(
228243
&mut self,
229244
cx: &mut task::Context<'_>,
@@ -269,10 +284,16 @@ where
269284
break;
270285
}
271286
}
272-
287+
if self.poll_idle(cx) {
288+
log::info!("incoming connection complete11");
289+
self.conn.graceful_shutdown();
290+
log::info!("incoming connection complete");
291+
return Poll::Ready(Ok(()));
292+
}
273293
// When the service is ready, accepts an incoming request.
274294
match ready!(self.conn.poll_accept(cx)) {
275295
Some(Ok((req, mut respond))) => {
296+
self.reset_idle(Duration::from_secs(60 * 60 * 24));
276297
trace!("incoming request");
277298
let content_length = headers::content_length_parse_all(req.headers());
278299
let ping = self
@@ -316,8 +337,9 @@ where
316337
if let Some(protocol) = req.extensions_mut().remove::<h2::ext::Protocol>() {
317338
req.extensions_mut().insert(Protocol::from_inner(protocol));
318339
}
319-
320-
let fut = H2Stream::new(service.call(req), connect_parts, respond);
340+
log::info!("{:?}",respond.stream_id());
341+
let fut = H2Stream::new(service.call(req), connect_parts, respond,
342+
self.idle_timeot.clone());
321343
exec.execute_h2stream(fut);
322344
}
323345
Some(Err(e)) => {
@@ -373,6 +395,7 @@ pin_project! {
373395
reply: SendResponse<SendBuf<B::Data>>,
374396
#[pin]
375397
state: H2StreamState<F, B>,
398+
idle_timeot: Arc<Mutex<Pin<Box<Sleep>>>>,
376399
}
377400
}
378401

@@ -408,10 +431,12 @@ where
408431
fut: F,
409432
connect_parts: Option<ConnectParts>,
410433
respond: SendResponse<SendBuf<B::Data>>,
434+
idle_timeot: Arc<Mutex<Pin<Box<Sleep>>>>
411435
) -> H2Stream<F, B> {
412436
H2Stream {
413437
reply: respond,
414438
state: H2StreamState::Service { fut, connect_parts },
439+
idle_timeot
415440
}
416441
}
417442
}
@@ -534,7 +559,12 @@ where
534559
type Output = ();
535560

536561
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
537-
self.poll2(cx).map(|res| {
562+
let idle_timeot = self.idle_timeot.clone();
563+
let poll = self.poll2(cx);
564+
poll.map(|res| {
565+
log::info!("con is idle");
566+
let mut sleep = idle_timeot.lock().unwrap();
567+
sleep.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(10));
538568
if let Err(e) = res {
539569
debug!("stream error: {}", e);
540570
}

0 commit comments

Comments
 (0)