Skip to content

Commit e3cbfe9

Browse files
authored
Merge pull request containerd#89 from Tim-Zhang/server-timeout
Add server timeout, remove client timeout
2 parents dcbdec8 + f2bbbee commit e3cbfe9

File tree

3 files changed

+32
-20
lines changed

3 files changed

+32
-20
lines changed

example/async-client.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ async fn main() {
4545
println!(
4646
"Green Thread 1 - {} -> {:?} ended: {:?}",
4747
"health.check()",
48-
thc.check(default_ctx(), &req).await,
48+
thc.check(context::with_timeout(20 * 1000 * 1000), &req)
49+
.await,
4950
now.elapsed(),
5051
);
5152
});

src/asynchronous/client.rs

+4-17
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ use protobuf::{CodedInputStream, CodedOutputStream, Message};
88
use std::collections::HashMap;
99
use std::os::unix::io::RawFd;
1010
use std::sync::{Arc, Mutex};
11-
use std::time::Duration;
1211

1312
use crate::common::MESSAGE_TYPE_RESPONSE;
1413
use crate::error::{Error, Result};
@@ -21,7 +20,6 @@ use tokio::{
2120
io::{split, AsyncWriteExt},
2221
sync::mpsc::{channel, Receiver, Sender},
2322
sync::Notify,
24-
time::timeout,
2523
};
2624

2725
type RequestSender = Sender<(Vec<u8>, Sender<Result<Vec<u8>>>)>;
@@ -158,21 +156,10 @@ impl Client {
158156
.await
159157
.map_err(|e| Error::Others(format!("Send packet to sender error {:?}", e)))?;
160158

161-
let result: Result<Vec<u8>> = if req.timeout_nano == 0 {
162-
rx.recv()
163-
.await
164-
.ok_or_else(|| Error::Others("Recive packet from recver error".to_string()))?
165-
} else {
166-
match timeout(Duration::from_nanos(req.timeout_nano as u64), rx.recv()).await {
167-
Ok(result) => result
168-
.ok_or_else(|| Error::Others("Recive packet from recver error".to_string()))?,
169-
Err(_) => {
170-
return Err(Error::Others(
171-
"Recive packet from recver error: timeout".to_string(),
172-
))
173-
}
174-
}
175-
};
159+
let result = rx
160+
.recv()
161+
.await
162+
.ok_or_else(|| Error::Others("Recive packet from recver error".to_string()))?;
176163

177164
let buf = result?;
178165
let mut s = CodedInputStream::from_bytes(&buf);

src/asynchronous/server.rs

+26-2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use std::collections::HashMap;
99
use std::os::unix::io::RawFd;
1010
use std::result::Result as StdResult;
1111
use std::sync::Arc;
12+
use std::time::Duration;
1213

1314
use crate::asynchronous::stream::{receive, respond, respond_with_status};
1415
use crate::asynchronous::unix_incoming::UnixIncoming;
@@ -30,6 +31,7 @@ use tokio::{
3031
select, spawn,
3132
sync::mpsc::{channel, Receiver, Sender},
3233
sync::watch,
34+
time::timeout,
3335
};
3436
use tokio_vsock::VsockListener;
3537

@@ -322,10 +324,32 @@ async fn do_handle_request(
322324
metadata: context::from_pb(&req.metadata),
323325
};
324326

325-
method.handler(ctx, req).await.map_err(|e| {
327+
let get_unknown_status_and_log_err = |e| {
326328
error!("method handle {} got error {:?}", path, &e);
327329
get_status(Code::UNKNOWN, e)
328-
})
330+
};
331+
332+
if req.timeout_nano == 0 {
333+
method
334+
.handler(ctx, req)
335+
.await
336+
.map_err(get_unknown_status_and_log_err)
337+
} else {
338+
timeout(
339+
Duration::from_nanos(req.timeout_nano as u64),
340+
method.handler(ctx, req),
341+
)
342+
.await
343+
.map_err(|_| {
344+
// Timed out
345+
error!("method handle {} got error timed out", path);
346+
get_status(Code::DEADLINE_EXCEEDED, "timeout")
347+
})
348+
.and_then(|r| {
349+
// Handler finished
350+
r.map_err(get_unknown_status_and_log_err)
351+
})
352+
}
329353
}
330354

331355
async fn handle_request(

0 commit comments

Comments
 (0)