From e0ac5fd507603ea65688e10a559ef1a5cca8be51 Mon Sep 17 00:00:00 2001 From: Alexander Krotov Date: Tue, 4 Aug 2020 02:11:09 +0300 Subject: [PATCH 1/2] command_with_timeout: apply timeouts separately to read and write Timeout should be restarted after successful write operation. --- src/smtp/client/inner.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/src/smtp/client/inner.rs b/src/smtp/client/inner.rs index 26654bd..01e13aa 100644 --- a/src/smtp/client/inner.rs +++ b/src/smtp/client/inner.rs @@ -207,13 +207,8 @@ impl InnerClient { command: C, timeout: Option<&Duration>, ) -> SmtpResult { - with_timeout(timeout, async { - self.as_mut().write(command.to_string().as_bytes()).await?; - let res = self.read_response_no_timeout().await?; - - Ok(res) - }) - .await + with_timeout(timeout, self.as_mut().write(command.to_string().as_bytes())).await?; + with_timeout(timeout, self.read_response_no_timeout()).await } /// Writes the given data to the server. From f840a6dd279f94a11c988665293c026e219a2611 Mon Sep 17 00:00:00 2001 From: Alexander Krotov Date: Tue, 4 Aug 2020 02:14:31 +0300 Subject: [PATCH 2/2] send_with_timeout: apply timeout to message sending Previously all the commands, such as RCPT and DATA, were using custom user-supplied timeout, but the most expensive operation of sending the message data and waiting for before-queue content filters to finish were using default SMTP timeout. This commit also removes the timeout from message reading operation. It normally happens locally and otherwise I/O operations have their own timeouts, e.g. if the message is read from NFS mount. --- src/smtp/client/inner.rs | 38 ++++++++++++++++++++++---------------- src/smtp/smtp_client.rs | 5 ++++- 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/src/smtp/client/inner.rs b/src/smtp/client/inner.rs index 01e13aa..fabd77f 100644 --- a/src/smtp/client/inner.rs +++ b/src/smtp/client/inner.rs @@ -171,29 +171,35 @@ impl InnerClient { } /// Sends the message content. - pub async fn message(mut self: Pin<&mut Self>, message: T) -> SmtpResult { - let timeout = self.timeout; - with_timeout(timeout.as_ref(), async { - let mut codec = ClientCodec::new(); - let mut message_reader = BufReader::new(message); + pub(crate) async fn message_with_timeout( + mut self: Pin<&mut Self>, + message: T, + timeout: Option<&Duration>, + ) -> SmtpResult { + let mut codec = ClientCodec::new(); - let mut message_bytes = Vec::new(); - message_reader.read_to_end(&mut message_bytes).await?; + let mut message_reader = BufReader::new(message); - if self.stream.is_none() { - return Err(From::from("Connection closed")); - } - let this = self.as_mut().project(); - let _: Pin<&mut Option> = this.stream; + let mut message_bytes = Vec::new(); + message_reader.read_to_end(&mut message_bytes).await?; + + if self.stream.is_none() { + return Err(From::from("Connection closed")); + } + let this = self.as_mut().project(); + let _: Pin<&mut Option> = this.stream; - let mut stream = this.stream.as_pin_mut().ok_or(Error::NoStream)?; + let mut stream = this.stream.as_pin_mut().ok_or(Error::NoStream)?; + let res: Result<(), Error> = with_timeout(timeout, async { codec.encode(&message_bytes, &mut stream).await?; stream.write_all(b"\r\n.\r\n").await?; - - self.read_response_no_timeout().await + Ok(()) }) - .await + .await; + res?; + + with_timeout(timeout, self.read_response_no_timeout()).await } /// Send the given SMTP command to the server. diff --git a/src/smtp/smtp_client.rs b/src/smtp/smtp_client.rs index f873117..f662cc5 100644 --- a/src/smtp/smtp_client.rs +++ b/src/smtp/smtp_client.rs @@ -552,7 +552,10 @@ impl<'a> Transport<'a> for SmtpTransport { // Data try_smtp!(client.as_mut().command(DataCommand).await, self); - let res = client.as_mut().message(email.message()).await; + let res = client + .as_mut() + .message_with_timeout(email.message(), timeout) + .await; // Message content if let Ok(result) = &res {