Skip to content

more flexible timeouts #26

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! It can be useful for testing purposes, or if you want to keep track of sent messages.
//!

use std::path::PathBuf;
use std::{path::PathBuf, time::Duration};

use async_std::fs::File;
use async_std::path::Path;
Expand Down Expand Up @@ -70,4 +70,12 @@ impl<'a> Transport<'a> for FileTransport {
.await?;
Ok(())
}

async fn send_with_timeout(
&mut self,
email: SendableEmail,
_timeout: Option<&Duration>,
) -> Self::Result {
self.send(email).await // Writing to a file does not have a timeout, so just ignore it.
}
}
7 changes: 7 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub use crate::smtp::client::net::ClientTlsParameters;
pub use crate::smtp::{ClientSecurity, SmtpClient, SmtpTransport};

use async_trait::async_trait;
use std::time::Duration;

/// Transport method for emails
#[async_trait]
Expand All @@ -42,4 +43,10 @@ pub trait Transport<'a> {

/// Sends the email
async fn send(&mut self, email: SendableEmail) -> Self::Result;

async fn send_with_timeout(
&mut self,
email: SendableEmail,
timeout: Option<&Duration>,
) -> Self::Result;
}
17 changes: 14 additions & 3 deletions src/sendmail/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ use async_trait::async_trait;
use log::info;
use std::convert::AsRef;
use std::io::prelude::*;
use std::process::{Command, Stdio};
use std::{
process::{Command, Stdio},
time::Duration,
};

pub mod error;

Expand Down Expand Up @@ -84,9 +87,17 @@ impl<'a> Transport<'a> for SendmailTransport {
.await?;

if output.status.success() {
return Ok(());
Ok(())
} else {
Err(error::Error::Client(String::from_utf8(output.stderr)?))
}
}

Err(error::Error::Client(String::from_utf8(output.stderr)?))
async fn send_with_timeout(
&mut self,
email: SendableEmail,
_timeout: Option<&Duration>,
) -> Self::Result {
self.send(email).await
}
}
89 changes: 53 additions & 36 deletions src/smtp/client/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl<S: Connector + Write + Read + Unpin> InnerClient<S> {
}

/// Get the read and write timeout.
pub fn timeout(&mut self) -> Option<&Duration> {
pub fn timeout(&self) -> Option<&Duration> {
self.timeout.as_ref()
}

Expand Down Expand Up @@ -172,35 +172,48 @@ impl<S: Connector + Write + Read + Unpin> InnerClient<S> {

/// Sends the message content.
pub async fn message<T: Read + Unpin>(mut self: Pin<&mut Self>, message: T) -> SmtpResult {
let mut codec = ClientCodec::new();

let mut message_reader = BufReader::new(message);
let timeout = self.timeout;
with_timeout(timeout.as_ref(), async {
let mut codec = ClientCodec::new();
let mut message_reader = BufReader::new(message);

let mut message_bytes = Vec::new();
message_reader.read_to_end(&mut message_bytes).await?;
let mut message_bytes = Vec::new();
message_reader.read_to_end(&mut message_bytes).await?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this wrapped into timeout now? It is a completely local operation.


if self.stream.is_none() {
return Err(From::from("Connection closed"));
}
let this = self.as_mut().project();
let _: Pin<&mut Option<S>> = this.stream;
if self.stream.is_none() {
return Err(From::from("Connection closed"));
}
let this = self.as_mut().project();
let _: Pin<&mut Option<S>> = this.stream;

let mut stream = this.stream.as_pin_mut().ok_or(Error::NoStream)?;
let mut stream = this.stream.as_pin_mut().ok_or(Error::NoStream)?;

with_timeout(this.timeout.as_ref(), async move {
codec.encode(&message_bytes, &mut stream).await?;
stream.write_all(b"\r\n.\r\n").await?;
Ok(())
})
.await?;

self.read_response().await
self.read_response_no_timeout().await
})
.await
}

/// Send the given SMTP command to the server.
pub async fn command<C: Display>(mut self: Pin<&mut Self>, command: C) -> SmtpResult {
self.as_mut().write(command.to_string().as_bytes()).await?;
self.read_response().await
pub async fn command<C: Display>(self: Pin<&mut Self>, command: C) -> SmtpResult {
let timeout = self.timeout;
self.command_with_timeout(command, timeout.as_ref()).await
}

pub async fn command_with_timeout<C: Display>(
mut self: Pin<&mut Self>,
command: C,
timeout: Option<&Duration>,
) -> SmtpResult {
with_timeout(timeout, async {
self.as_mut().write(command.to_string().as_bytes()).await?;
let res = self.read_response_no_timeout().await?;

Ok(res)
})
.await
}

/// Writes the given data to the server.
Expand All @@ -212,12 +225,8 @@ impl<S: Connector + Write + Read + Unpin> InnerClient<S> {
let _: Pin<&mut Option<S>> = this.stream;
let mut stream = this.stream.as_pin_mut().ok_or(Error::NoStream)?;

with_timeout(this.timeout.as_ref(), async move {
stream.write_all(string).await?;
stream.flush().await?;
Ok(())
})
.await?;
stream.write_all(string).await?;
stream.flush().await?;

debug!(
">> {}",
Expand All @@ -227,15 +236,20 @@ impl<S: Connector + Write + Read + Unpin> InnerClient<S> {
}

/// Read an SMTP response from the wire.
pub async fn read_response(mut self: Pin<&mut Self>) -> SmtpResult {
pub async fn read_response(self: Pin<&mut Self>) -> SmtpResult {
let timeout = self.timeout;
with_timeout(timeout.as_ref(), self.read_response_no_timeout()).await
}

async fn read_response_no_timeout(mut self: Pin<&mut Self>) -> SmtpResult {
let this = self.as_mut().project();
let stream = this.stream.as_pin_mut().ok_or(Error::NoStream)?;

let mut reader = BufReader::new(stream);
let mut buffer = String::with_capacity(100);

loop {
let read = with_timeout(this.timeout.as_ref(), reader.read_line(&mut buffer)).await?;
let read = reader.read_line(&mut buffer).await?;
if read == 0 {
break;
}
Expand Down Expand Up @@ -263,17 +277,20 @@ impl<S: Connector + Write + Read + Unpin> InnerClient<S> {
}

/// Execute io operations with an optional timeout using.
async fn with_timeout<T, F>(timeout: Option<&Duration>, f: F) -> Result<T, Error>
pub(crate) async fn with_timeout<T, F, E>(
timeout: Option<&Duration>,
f: F,
) -> std::result::Result<T, E>
where
F: Future<Output = async_std::io::Result<T>>,
F: Future<Output = std::result::Result<T, E>>,
E: From<async_std::future::TimeoutError>,
{
let r = if let Some(timeout) = timeout {
async_std::io::timeout(*timeout, f).await?
if let Some(timeout) = timeout {
let res = async_std::future::timeout(*timeout, f).await??;
Ok(res)
} else {
f.await?
};

Ok(r)
f.await
}
}

#[cfg(test)]
Expand Down
24 changes: 18 additions & 6 deletions src/smtp/smtp_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,10 @@ impl<'a> SmtpTransport {
.await?;

client.set_timeout(self.client_info.timeout);
let _response = client.read_response().await?;
let _response = super::client::with_timeout(self.client_info.timeout.as_ref(), async {
client.read_response().await
})
.await?;
}

self.post_connect().await
Expand Down Expand Up @@ -498,6 +501,15 @@ impl<'a> Transport<'a> for SmtpTransport {

/// Sends an email
async fn send(&mut self, email: SendableEmail) -> SmtpResult {
let timeout = self.client.timeout().cloned();
self.send_with_timeout(email, timeout.as_ref()).await
}

async fn send_with_timeout(
&mut self,
email: SendableEmail,
timeout: Option<&Duration>,
) -> Self::Result {
let message_id = email.message_id().to_string();

// Mail
Expand All @@ -516,10 +528,10 @@ impl<'a> Transport<'a> for SmtpTransport {
try_smtp!(
client
.as_mut()
.command(MailCommand::new(
email.envelope().from().cloned(),
mail_options,
))
.command_with_timeout(
MailCommand::new(email.envelope().from().cloned(), mail_options),
timeout
)
.await,
self
);
Expand All @@ -529,7 +541,7 @@ impl<'a> Transport<'a> for SmtpTransport {
try_smtp!(
client
.as_mut()
.command(RcptCommand::new(to_address.clone(), vec![]))
.command_with_timeout(RcptCommand::new(to_address.clone(), vec![]), timeout)
.await,
self
);
Expand Down
9 changes: 9 additions & 0 deletions src/stub/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use log::info;

use crate::SendableEmail;
use crate::Transport;
use std::time::Duration;

/// This transport logs the message envelope and returns the given response
#[derive(Debug, Clone, Copy)]
Expand Down Expand Up @@ -45,4 +46,12 @@ impl<'a> Transport<'a> for StubTransport {
);
self.response
}
async fn send_with_timeout(
&mut self,
email: SendableEmail,
timeout: Option<&Duration>,
) -> Self::Result {
info!("Timeout: {:?}", timeout);
self.send(email).await
}
}