Skip to content

Commit c167dbe

Browse files
more flexible timeouts
Co-authored-by: dignifiedquire <[email protected]>
1 parent 8e98b9a commit c167dbe

File tree

6 files changed

+110
-46
lines changed

6 files changed

+110
-46
lines changed

src/file/mod.rs

+9-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
//! It can be useful for testing purposes, or if you want to keep track of sent messages.
44
//!
55
6-
use std::path::PathBuf;
6+
use std::{path::PathBuf, time::Duration};
77

88
use async_std::fs::File;
99
use async_std::path::Path;
@@ -70,4 +70,12 @@ impl<'a> Transport<'a> for FileTransport {
7070
.await?;
7171
Ok(())
7272
}
73+
74+
async fn send_with_timeout(
75+
&mut self,
76+
email: SendableEmail,
77+
_timeout: Option<&Duration>,
78+
) -> Self::Result {
79+
self.send(email).await // Writing to a file does not have a timeout, so just ignore it.
80+
}
7381
}

src/lib.rs

+7
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ pub use crate::smtp::client::net::ClientTlsParameters;
3333
pub use crate::smtp::{ClientSecurity, SmtpClient, SmtpTransport};
3434

3535
use async_trait::async_trait;
36+
use std::time::Duration;
3637

3738
/// Transport method for emails
3839
#[async_trait]
@@ -42,4 +43,10 @@ pub trait Transport<'a> {
4243

4344
/// Sends the email
4445
async fn send(&mut self, email: SendableEmail) -> Self::Result;
46+
47+
async fn send_with_timeout(
48+
&mut self,
49+
email: SendableEmail,
50+
timeout: Option<&Duration>,
51+
) -> Self::Result;
4552
}

src/sendmail/mod.rs

+14-3
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@ use async_trait::async_trait;
1010
use log::info;
1111
use std::convert::AsRef;
1212
use std::io::prelude::*;
13-
use std::process::{Command, Stdio};
13+
use std::{
14+
process::{Command, Stdio},
15+
time::Duration,
16+
};
1417

1518
pub mod error;
1619

@@ -84,9 +87,17 @@ impl<'a> Transport<'a> for SendmailTransport {
8487
.await?;
8588

8689
if output.status.success() {
87-
return Ok(());
90+
Ok(())
91+
} else {
92+
Err(error::Error::Client(String::from_utf8(output.stderr)?))
8893
}
94+
}
8995

90-
Err(error::Error::Client(String::from_utf8(output.stderr)?))
96+
async fn send_with_timeout(
97+
&mut self,
98+
email: SendableEmail,
99+
_timeout: Option<&Duration>,
100+
) -> Self::Result {
101+
self.send(email).await
91102
}
92103
}

src/smtp/client/inner.rs

+53-36
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ impl<S: Connector + Write + Read + Unpin> InnerClient<S> {
9999
}
100100

101101
/// Get the read and write timeout.
102-
pub fn timeout(&mut self) -> Option<&Duration> {
102+
pub fn timeout(&self) -> Option<&Duration> {
103103
self.timeout.as_ref()
104104
}
105105

@@ -172,35 +172,48 @@ impl<S: Connector + Write + Read + Unpin> InnerClient<S> {
172172

173173
/// Sends the message content.
174174
pub async fn message<T: Read + Unpin>(mut self: Pin<&mut Self>, message: T) -> SmtpResult {
175-
let mut codec = ClientCodec::new();
176-
177-
let mut message_reader = BufReader::new(message);
175+
let timeout = self.timeout;
176+
with_timeout(timeout.as_ref(), async {
177+
let mut codec = ClientCodec::new();
178+
let mut message_reader = BufReader::new(message);
178179

179-
let mut message_bytes = Vec::new();
180-
message_reader.read_to_end(&mut message_bytes).await?;
180+
let mut message_bytes = Vec::new();
181+
message_reader.read_to_end(&mut message_bytes).await?;
181182

182-
if self.stream.is_none() {
183-
return Err(From::from("Connection closed"));
184-
}
185-
let this = self.as_mut().project();
186-
let _: Pin<&mut Option<S>> = this.stream;
183+
if self.stream.is_none() {
184+
return Err(From::from("Connection closed"));
185+
}
186+
let this = self.as_mut().project();
187+
let _: Pin<&mut Option<S>> = this.stream;
187188

188-
let mut stream = this.stream.as_pin_mut().ok_or(Error::NoStream)?;
189+
let mut stream = this.stream.as_pin_mut().ok_or(Error::NoStream)?;
189190

190-
with_timeout(this.timeout.as_ref(), async move {
191191
codec.encode(&message_bytes, &mut stream).await?;
192192
stream.write_all(b"\r\n.\r\n").await?;
193-
Ok(())
194-
})
195-
.await?;
196193

197-
self.read_response().await
194+
self.read_response_no_timeout().await
195+
})
196+
.await
198197
}
199198

200199
/// Send the given SMTP command to the server.
201-
pub async fn command<C: Display>(mut self: Pin<&mut Self>, command: C) -> SmtpResult {
202-
self.as_mut().write(command.to_string().as_bytes()).await?;
203-
self.read_response().await
200+
pub async fn command<C: Display>(self: Pin<&mut Self>, command: C) -> SmtpResult {
201+
let timeout = self.timeout;
202+
self.command_with_timeout(command, timeout.as_ref()).await
203+
}
204+
205+
pub async fn command_with_timeout<C: Display>(
206+
mut self: Pin<&mut Self>,
207+
command: C,
208+
timeout: Option<&Duration>,
209+
) -> SmtpResult {
210+
with_timeout(timeout, async {
211+
self.as_mut().write(command.to_string().as_bytes()).await?;
212+
let res = self.read_response_no_timeout().await?;
213+
214+
Ok(res)
215+
})
216+
.await
204217
}
205218

206219
/// Writes the given data to the server.
@@ -212,12 +225,8 @@ impl<S: Connector + Write + Read + Unpin> InnerClient<S> {
212225
let _: Pin<&mut Option<S>> = this.stream;
213226
let mut stream = this.stream.as_pin_mut().ok_or(Error::NoStream)?;
214227

215-
with_timeout(this.timeout.as_ref(), async move {
216-
stream.write_all(string).await?;
217-
stream.flush().await?;
218-
Ok(())
219-
})
220-
.await?;
228+
stream.write_all(string).await?;
229+
stream.flush().await?;
221230

222231
debug!(
223232
">> {}",
@@ -227,15 +236,20 @@ impl<S: Connector + Write + Read + Unpin> InnerClient<S> {
227236
}
228237

229238
/// Read an SMTP response from the wire.
230-
pub async fn read_response(mut self: Pin<&mut Self>) -> SmtpResult {
239+
pub async fn read_response(self: Pin<&mut Self>) -> SmtpResult {
240+
let timeout = self.timeout;
241+
with_timeout(timeout.as_ref(), self.read_response_no_timeout()).await
242+
}
243+
244+
async fn read_response_no_timeout(mut self: Pin<&mut Self>) -> SmtpResult {
231245
let this = self.as_mut().project();
232246
let stream = this.stream.as_pin_mut().ok_or(Error::NoStream)?;
233247

234248
let mut reader = BufReader::new(stream);
235249
let mut buffer = String::with_capacity(100);
236250

237251
loop {
238-
let read = with_timeout(this.timeout.as_ref(), reader.read_line(&mut buffer)).await?;
252+
let read = reader.read_line(&mut buffer).await?;
239253
if read == 0 {
240254
break;
241255
}
@@ -263,17 +277,20 @@ impl<S: Connector + Write + Read + Unpin> InnerClient<S> {
263277
}
264278

265279
/// Execute io operations with an optional timeout using.
266-
async fn with_timeout<T, F>(timeout: Option<&Duration>, f: F) -> Result<T, Error>
280+
pub(crate) async fn with_timeout<T, F, E>(
281+
timeout: Option<&Duration>,
282+
f: F,
283+
) -> std::result::Result<T, E>
267284
where
268-
F: Future<Output = async_std::io::Result<T>>,
285+
F: Future<Output = std::result::Result<T, E>>,
286+
E: From<async_std::future::TimeoutError>,
269287
{
270-
let r = if let Some(timeout) = timeout {
271-
async_std::io::timeout(*timeout, f).await?
288+
if let Some(timeout) = timeout {
289+
let res = async_std::future::timeout(*timeout, f).await??;
290+
Ok(res)
272291
} else {
273-
f.await?
274-
};
275-
276-
Ok(r)
292+
f.await
293+
}
277294
}
278295

279296
#[cfg(test)]

src/smtp/smtp_client.rs

+18-6
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,10 @@ impl<'a> SmtpTransport {
306306
.await?;
307307

308308
client.set_timeout(self.client_info.timeout);
309-
let _response = client.read_response().await?;
309+
let _response = super::client::with_timeout(self.client_info.timeout.as_ref(), async {
310+
client.read_response().await
311+
})
312+
.await?;
310313
}
311314

312315
self.post_connect().await
@@ -498,6 +501,15 @@ impl<'a> Transport<'a> for SmtpTransport {
498501

499502
/// Sends an email
500503
async fn send(&mut self, email: SendableEmail) -> SmtpResult {
504+
let timeout = self.client.timeout().cloned();
505+
self.send_with_timeout(email, timeout.as_ref()).await
506+
}
507+
508+
async fn send_with_timeout(
509+
&mut self,
510+
email: SendableEmail,
511+
timeout: Option<&Duration>,
512+
) -> Self::Result {
501513
let message_id = email.message_id().to_string();
502514

503515
// Mail
@@ -516,10 +528,10 @@ impl<'a> Transport<'a> for SmtpTransport {
516528
try_smtp!(
517529
client
518530
.as_mut()
519-
.command(MailCommand::new(
520-
email.envelope().from().cloned(),
521-
mail_options,
522-
))
531+
.command_with_timeout(
532+
MailCommand::new(email.envelope().from().cloned(), mail_options),
533+
timeout
534+
)
523535
.await,
524536
self
525537
);
@@ -529,7 +541,7 @@ impl<'a> Transport<'a> for SmtpTransport {
529541
try_smtp!(
530542
client
531543
.as_mut()
532-
.command(RcptCommand::new(to_address.clone(), vec![]))
544+
.command_with_timeout(RcptCommand::new(to_address.clone(), vec![]), timeout)
533545
.await,
534546
self
535547
);

src/stub/mod.rs

+9
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use log::info;
77

88
use crate::SendableEmail;
99
use crate::Transport;
10+
use std::time::Duration;
1011

1112
/// This transport logs the message envelope and returns the given response
1213
#[derive(Debug, Clone, Copy)]
@@ -45,4 +46,12 @@ impl<'a> Transport<'a> for StubTransport {
4546
);
4647
self.response
4748
}
49+
async fn send_with_timeout(
50+
&mut self,
51+
email: SendableEmail,
52+
timeout: Option<&Duration>,
53+
) -> Self::Result {
54+
info!("Timeout: {:?}", timeout);
55+
self.send(email).await
56+
}
4857
}

0 commit comments

Comments
 (0)