Skip to content

Commit e802ec2

Browse files
Alexander Krotovlink2xt
authored andcommitted
send_with_timeout: apply timeout to message sending
Previously all the commands, such as RCPT and DATA, were using custom user-supplied timeout, but the most expensive operation of sending the message data and waiting for before-queue content filters to finish were using default SMTP timeout. This commit also removes the timeout from message reading operation. It normally happens locally and otherwise I/O operations have their own timeouts, e.g. if the message is read from NFS mount.
1 parent 42e5e03 commit e802ec2

File tree

2 files changed

+26
-17
lines changed

2 files changed

+26
-17
lines changed

src/smtp/client/inner.rs

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -171,29 +171,35 @@ impl<S: Connector + Write + Read + Unpin> InnerClient<S> {
171171
}
172172

173173
/// Sends the message content.
174-
pub async fn message<T: Read + Unpin>(mut self: Pin<&mut Self>, message: T) -> SmtpResult {
175-
let timeout = self.timeout;
176-
with_timeout(timeout.as_ref(), async {
177-
let mut codec = ClientCodec::new();
178-
let mut message_reader = BufReader::new(message);
174+
pub(crate) async fn message_with_timeout<T: Read + Unpin>(
175+
mut self: Pin<&mut Self>,
176+
message: T,
177+
timeout: Option<&Duration>,
178+
) -> SmtpResult {
179+
let mut codec = ClientCodec::new();
179180

180-
let mut message_bytes = Vec::new();
181-
message_reader.read_to_end(&mut message_bytes).await?;
181+
let mut message_reader = BufReader::new(message);
182182

183-
if self.stream.is_none() {
184-
return Err(From::from("Connection closed"));
185-
}
186-
let this = self.as_mut().project();
187-
let _: Pin<&mut Option<S>> = this.stream;
183+
let mut message_bytes = Vec::new();
184+
message_reader.read_to_end(&mut message_bytes).await?;
185+
186+
if self.stream.is_none() {
187+
return Err(From::from("Connection closed"));
188+
}
189+
let this = self.as_mut().project();
190+
let _: Pin<&mut Option<S>> = this.stream;
188191

189-
let mut stream = this.stream.as_pin_mut().ok_or(Error::NoStream)?;
192+
let mut stream = this.stream.as_pin_mut().ok_or(Error::NoStream)?;
190193

194+
let res: Result<(), Error> = with_timeout(timeout, async {
191195
codec.encode(&message_bytes, &mut stream).await?;
192196
stream.write_all(b"\r\n.\r\n").await?;
193-
194-
self.read_response_no_timeout().await
197+
Ok(())
195198
})
196-
.await
199+
.await;
200+
res?;
201+
202+
with_timeout(timeout, self.read_response_no_timeout()).await
197203
}
198204

199205
/// Send the given SMTP command to the server.

src/smtp/smtp_client.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -552,7 +552,10 @@ impl<'a> Transport<'a> for SmtpTransport {
552552
// Data
553553
try_smtp!(client.as_mut().command(DataCommand).await, self);
554554

555-
let res = client.as_mut().message(email.message()).await;
555+
let res = client
556+
.as_mut()
557+
.message_with_timeout(email.message(), timeout)
558+
.await;
556559

557560
// Message content
558561
if let Ok(result) = &res {

0 commit comments

Comments
 (0)