Skip to content

Commit 03f6ede

Browse files
committed
fix: Prefetch messages in limited batches (#6915)
I have logs from a user where messages are prefetched for long minutes, and while it's not a problem on its own, we can't rely that the connection overlives such a period, so make `fetch_new_messages()` prefetch (and then actually download) messages in batches of 500 messages.
1 parent df2c35b commit 03f6ede

File tree

2 files changed

+44
-19
lines changed

2 files changed

+44
-19
lines changed

src/imap.rs

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -566,10 +566,38 @@ impl Imap {
566566
}
567567
session.new_mail = false;
568568

569+
let mut read_cnt = 0;
570+
loop {
571+
let (n, fetch_more) = self
572+
.fetch_new_msg_batch(context, session, folder, folder_meaning)
573+
.await?;
574+
read_cnt += n;
575+
if !fetch_more {
576+
return Ok(read_cnt > 0);
577+
}
578+
}
579+
}
580+
581+
/// Returns number of messages processed and whether the function should be called again.
582+
async fn fetch_new_msg_batch(
583+
&mut self,
584+
context: &Context,
585+
session: &mut Session,
586+
folder: &str,
587+
folder_meaning: FolderMeaning,
588+
) -> Result<(usize, bool)> {
569589
let uid_validity = get_uidvalidity(context, folder).await?;
570590
let old_uid_next = get_uid_next(context, folder).await?;
591+
info!(
592+
context,
593+
"fetch_new_msg_batch({folder}): UIDVALIDITY={uid_validity}, UIDNEXT={old_uid_next}."
594+
);
571595

572-
let msgs = session.prefetch(old_uid_next).await.context("prefetch")?;
596+
let uids_to_prefetch = 500;
597+
let msgs = session
598+
.prefetch(old_uid_next, uids_to_prefetch)
599+
.await
600+
.context("prefetch")?;
573601
let read_cnt = msgs.len();
574602

575603
let download_limit = context.download_limit().await?;
@@ -729,7 +757,8 @@ impl Imap {
729757
largest_uid_fetched
730758
};
731759

732-
let actually_download_messages_future = async move {
760+
let actually_download_messages_future = async {
761+
let sender = sender;
733762
let mut uids_fetch_in_batch = Vec::with_capacity(max(uids_fetch.len(), 1));
734763
let mut fetch_partially = false;
735764
uids_fetch.push((0, !uids_fetch.last().unwrap_or(&(0, false)).1));
@@ -764,14 +793,17 @@ impl Imap {
764793
// if the message has arrived after selecting mailbox
765794
// and determining its UIDNEXT and before prefetch.
766795
let mut new_uid_next = largest_uid_fetched + 1;
767-
if fetch_res.is_ok() {
796+
let fetch_more = fetch_res.is_ok() && {
797+
let prefetch_uid_next = old_uid_next + uids_to_prefetch;
768798
// If we have successfully fetched all messages we planned during prefetch,
769799
// then we have covered at least the range between old UIDNEXT
770800
// and UIDNEXT of the mailbox at the time of selecting it.
771-
new_uid_next = max(new_uid_next, mailbox_uid_next);
801+
new_uid_next = max(new_uid_next, min(prefetch_uid_next, mailbox_uid_next));
772802

773803
new_uid_next = max(new_uid_next, largest_uid_skipped.unwrap_or(0) + 1);
774-
}
804+
805+
prefetch_uid_next < mailbox_uid_next
806+
};
775807
if new_uid_next > old_uid_next {
776808
set_uid_next(context, folder, new_uid_next).await?;
777809
}
@@ -788,7 +820,7 @@ impl Imap {
788820
// establish a new session if this one is broken.
789821
fetch_res?;
790822

791-
Ok(read_cnt > 0)
823+
Ok((read_cnt, fetch_more))
792824
}
793825

794826
/// Read the recipients from old emails sent by the user and add them as contacts.

src/imap/session.rs

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -110,14 +110,16 @@ impl Session {
110110
Ok(list)
111111
}
112112

113-
/// Prefetch all messages greater than or equal to `uid_next`. Returns a list of fetch results
114-
/// in the order of ascending delivery time to the server (INTERNALDATE).
113+
/// Prefetch `n_uids` messages starting from `uid_next`. Returns a list of fetch results in the
114+
/// order of ascending delivery time to the server (INTERNALDATE).
115115
pub(crate) async fn prefetch(
116116
&mut self,
117117
uid_next: u32,
118+
n_uids: u32,
118119
) -> Result<Vec<(u32, async_imap::types::Fetch)>> {
120+
let uid_last = uid_next.saturating_add(n_uids - 1);
119121
// fetch messages with larger UID than the last one seen
120-
let set = format!("{uid_next}:*");
122+
let set = format!("{uid_next}:{uid_last}");
121123
let mut list = self
122124
.uid_fetch(set, PREFETCH_FLAGS)
123125
.await
@@ -126,16 +128,7 @@ impl Session {
126128
let mut msgs = BTreeMap::new();
127129
while let Some(msg) = list.try_next().await? {
128130
if let Some(msg_uid) = msg.uid {
129-
// If the mailbox is not empty, results always include
130-
// at least one UID, even if last_seen_uid+1 is past
131-
// the last UID in the mailbox. It happens because
132-
// uid:* is interpreted the same way as *:uid.
133-
// See <https://tools.ietf.org/html/rfc3501#page-61> for
134-
// standard reference. Therefore, sometimes we receive
135-
// already seen messages and have to filter them out.
136-
if msg_uid >= uid_next {
137-
msgs.insert((msg.internal_date(), msg_uid), msg);
138-
}
131+
msgs.insert((msg.internal_date(), msg_uid), msg);
139132
}
140133
}
141134

0 commit comments

Comments
 (0)