diff --git a/src/file/mod.rs b/src/file/mod.rs index e38749d..c615bd8 100644 --- a/src/file/mod.rs +++ b/src/file/mod.rs @@ -3,7 +3,7 @@ //! It can be useful for testing purposes, or if you want to keep track of sent messages. //! -use std::path::PathBuf; +use std::{path::PathBuf, time::Duration}; use async_std::fs::File; use async_std::path::Path; @@ -70,4 +70,12 @@ impl<'a> Transport<'a> for FileTransport { .await?; Ok(()) } + + async fn send_with_timeout( + &mut self, + email: SendableEmail, + _timeout: Option<&Duration>, + ) -> Self::Result { + self.send(email).await // Writing to a file does not have a timeout, so just ignore it. + } } diff --git a/src/lib.rs b/src/lib.rs index d9878e5..2815033 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -33,6 +33,7 @@ pub use crate::smtp::client::net::ClientTlsParameters; pub use crate::smtp::{ClientSecurity, SmtpClient, SmtpTransport}; use async_trait::async_trait; +use std::time::Duration; /// Transport method for emails #[async_trait] @@ -42,4 +43,10 @@ pub trait Transport<'a> { /// Sends the email async fn send(&mut self, email: SendableEmail) -> Self::Result; + + async fn send_with_timeout( + &mut self, + email: SendableEmail, + timeout: Option<&Duration>, + ) -> Self::Result; } diff --git a/src/sendmail/mod.rs b/src/sendmail/mod.rs index 6aadb0b..abedd90 100644 --- a/src/sendmail/mod.rs +++ b/src/sendmail/mod.rs @@ -10,7 +10,10 @@ use async_trait::async_trait; use log::info; use std::convert::AsRef; use std::io::prelude::*; -use std::process::{Command, Stdio}; +use std::{ + process::{Command, Stdio}, + time::Duration, +}; pub mod error; @@ -84,9 +87,17 @@ impl<'a> Transport<'a> for SendmailTransport { .await?; if output.status.success() { - return Ok(()); + Ok(()) + } else { + Err(error::Error::Client(String::from_utf8(output.stderr)?)) } + } - Err(error::Error::Client(String::from_utf8(output.stderr)?)) + async fn send_with_timeout( + &mut self, + email: SendableEmail, + _timeout: Option<&Duration>, + ) -> Self::Result { + self.send(email).await } } diff --git a/src/smtp/client/inner.rs b/src/smtp/client/inner.rs index b8e593a..26654bd 100644 --- a/src/smtp/client/inner.rs +++ b/src/smtp/client/inner.rs @@ -99,7 +99,7 @@ impl InnerClient { } /// Get the read and write timeout. - pub fn timeout(&mut self) -> Option<&Duration> { + pub fn timeout(&self) -> Option<&Duration> { self.timeout.as_ref() } @@ -172,35 +172,48 @@ impl InnerClient { /// Sends the message content. pub async fn message(mut self: Pin<&mut Self>, message: T) -> SmtpResult { - let mut codec = ClientCodec::new(); - - let mut message_reader = BufReader::new(message); + let timeout = self.timeout; + with_timeout(timeout.as_ref(), async { + let mut codec = ClientCodec::new(); + let mut message_reader = BufReader::new(message); - let mut message_bytes = Vec::new(); - message_reader.read_to_end(&mut message_bytes).await?; + let mut message_bytes = Vec::new(); + message_reader.read_to_end(&mut message_bytes).await?; - if self.stream.is_none() { - return Err(From::from("Connection closed")); - } - let this = self.as_mut().project(); - let _: Pin<&mut Option> = this.stream; + if self.stream.is_none() { + return Err(From::from("Connection closed")); + } + let this = self.as_mut().project(); + let _: Pin<&mut Option> = this.stream; - let mut stream = this.stream.as_pin_mut().ok_or(Error::NoStream)?; + let mut stream = this.stream.as_pin_mut().ok_or(Error::NoStream)?; - with_timeout(this.timeout.as_ref(), async move { codec.encode(&message_bytes, &mut stream).await?; stream.write_all(b"\r\n.\r\n").await?; - Ok(()) - }) - .await?; - self.read_response().await + self.read_response_no_timeout().await + }) + .await } /// Send the given SMTP command to the server. - pub async fn command(mut self: Pin<&mut Self>, command: C) -> SmtpResult { - self.as_mut().write(command.to_string().as_bytes()).await?; - self.read_response().await + pub async fn command(self: Pin<&mut Self>, command: C) -> SmtpResult { + let timeout = self.timeout; + self.command_with_timeout(command, timeout.as_ref()).await + } + + pub async fn command_with_timeout( + mut self: Pin<&mut Self>, + command: C, + timeout: Option<&Duration>, + ) -> SmtpResult { + with_timeout(timeout, async { + self.as_mut().write(command.to_string().as_bytes()).await?; + let res = self.read_response_no_timeout().await?; + + Ok(res) + }) + .await } /// Writes the given data to the server. @@ -212,12 +225,8 @@ impl InnerClient { let _: Pin<&mut Option> = this.stream; let mut stream = this.stream.as_pin_mut().ok_or(Error::NoStream)?; - with_timeout(this.timeout.as_ref(), async move { - stream.write_all(string).await?; - stream.flush().await?; - Ok(()) - }) - .await?; + stream.write_all(string).await?; + stream.flush().await?; debug!( ">> {}", @@ -227,7 +236,12 @@ impl InnerClient { } /// Read an SMTP response from the wire. - pub async fn read_response(mut self: Pin<&mut Self>) -> SmtpResult { + pub async fn read_response(self: Pin<&mut Self>) -> SmtpResult { + let timeout = self.timeout; + with_timeout(timeout.as_ref(), self.read_response_no_timeout()).await + } + + async fn read_response_no_timeout(mut self: Pin<&mut Self>) -> SmtpResult { let this = self.as_mut().project(); let stream = this.stream.as_pin_mut().ok_or(Error::NoStream)?; @@ -235,7 +249,7 @@ impl InnerClient { let mut buffer = String::with_capacity(100); loop { - let read = with_timeout(this.timeout.as_ref(), reader.read_line(&mut buffer)).await?; + let read = reader.read_line(&mut buffer).await?; if read == 0 { break; } @@ -263,17 +277,20 @@ impl InnerClient { } /// Execute io operations with an optional timeout using. -async fn with_timeout(timeout: Option<&Duration>, f: F) -> Result +pub(crate) async fn with_timeout( + timeout: Option<&Duration>, + f: F, +) -> std::result::Result where - F: Future>, + F: Future>, + E: From, { - let r = if let Some(timeout) = timeout { - async_std::io::timeout(*timeout, f).await? + if let Some(timeout) = timeout { + let res = async_std::future::timeout(*timeout, f).await??; + Ok(res) } else { - f.await? - }; - - Ok(r) + f.await + } } #[cfg(test)] diff --git a/src/smtp/smtp_client.rs b/src/smtp/smtp_client.rs index 4563878..f873117 100644 --- a/src/smtp/smtp_client.rs +++ b/src/smtp/smtp_client.rs @@ -306,7 +306,10 @@ impl<'a> SmtpTransport { .await?; client.set_timeout(self.client_info.timeout); - let _response = client.read_response().await?; + let _response = super::client::with_timeout(self.client_info.timeout.as_ref(), async { + client.read_response().await + }) + .await?; } self.post_connect().await @@ -498,6 +501,15 @@ impl<'a> Transport<'a> for SmtpTransport { /// Sends an email async fn send(&mut self, email: SendableEmail) -> SmtpResult { + let timeout = self.client.timeout().cloned(); + self.send_with_timeout(email, timeout.as_ref()).await + } + + async fn send_with_timeout( + &mut self, + email: SendableEmail, + timeout: Option<&Duration>, + ) -> Self::Result { let message_id = email.message_id().to_string(); // Mail @@ -516,10 +528,10 @@ impl<'a> Transport<'a> for SmtpTransport { try_smtp!( client .as_mut() - .command(MailCommand::new( - email.envelope().from().cloned(), - mail_options, - )) + .command_with_timeout( + MailCommand::new(email.envelope().from().cloned(), mail_options), + timeout + ) .await, self ); @@ -529,7 +541,7 @@ impl<'a> Transport<'a> for SmtpTransport { try_smtp!( client .as_mut() - .command(RcptCommand::new(to_address.clone(), vec![])) + .command_with_timeout(RcptCommand::new(to_address.clone(), vec![]), timeout) .await, self ); diff --git a/src/stub/mod.rs b/src/stub/mod.rs index 99e4675..c33ce48 100644 --- a/src/stub/mod.rs +++ b/src/stub/mod.rs @@ -7,6 +7,7 @@ use log::info; use crate::SendableEmail; use crate::Transport; +use std::time::Duration; /// This transport logs the message envelope and returns the given response #[derive(Debug, Clone, Copy)] @@ -45,4 +46,12 @@ impl<'a> Transport<'a> for StubTransport { ); self.response } + async fn send_with_timeout( + &mut self, + email: SendableEmail, + timeout: Option<&Duration>, + ) -> Self::Result { + info!("Timeout: {:?}", timeout); + self.send(email).await + } }