Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pytest.local.ini
/evals
/tests/testdata/Episode_53_Answer_results.json
/tests/testdata/Episode_53_Search_results.json
/tests/testdata/email-mbox/kubuntu-users.mbox
/tests/testdata/email-mbox/ubuntu-devel.mbox

# Email demo
/tools/gmail/client_secret.json
Expand All @@ -41,3 +43,4 @@ pytest.local.ini

# Monty Python demo
/examples/testdata/MP

66 changes: 60 additions & 6 deletions src/typeagent/emails/email_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from email.header import decode_header, make_header
from email.message import Message
from email.utils import parsedate_to_datetime
import mailbox
from pathlib import Path
import re
from typing import Iterable
Expand All @@ -20,6 +21,18 @@ def decode_encoded_words(value: str) -> str:
return str(make_header(decode_header(value)))


def _header_to_str(value: object) -> str | None:
"""Coerce an email header value to str or None.

msg.get() can return an email.header.Header object instead of a plain str
when the header contains RFC 2047 encoded words. Pydantic expects str, so
we normalise here.
"""
if value is None:
return None
return str(value)


def import_emails_from_dir(
dir_path: str, max_chunk_length: int = 4096
) -> Iterable[EmailMessage]:
Expand All @@ -41,6 +54,39 @@ def import_email_from_file(
return email


def import_emails_from_mbox(
mbox_path: str, max_chunk_length: int = 4096
) -> Iterable[tuple[int, EmailMessage]]:
"""Import emails from an mbox file.

Args:
mbox_path: Path to the mbox file.
max_chunk_length: Maximum length of each text chunk.

Yields:
Tuples of (message_index, EmailMessage) for each email in the mbox file.
The message_index is 0-based.
"""
mbox = mailbox.mbox(mbox_path)
for i, message in enumerate(mbox):
email = import_email_message(message, max_chunk_length)
email.src_url = f"{mbox_path}:{i}"
yield i, email


def count_emails_in_mbox(mbox_path: str) -> int:
"""Count the number of emails in an mbox file.

Args:
mbox_path: Path to the mbox file.

Returns:
The number of emails in the mbox file.
"""
mbox = mailbox.mbox(mbox_path)
return sum(1 for _ in mbox)


# Imports a single email MIME string and returns an EmailMessage object
def import_email_string(
email_string: str, max_chunk_length: int = 4096
Expand All @@ -64,14 +110,16 @@ def import_forwarded_email_string(
# Imports an email.message.Message object and returns an EmailMessage object
# If the message is a reply, returns only the latest response.
def import_email_message(msg: Message, max_chunk_length: int) -> EmailMessage:
# Extract metadata from
# Extract metadata from headers.
# msg.get() can return a Header object instead of str for encoded headers,
# so coerce all values to str.
email_meta = EmailMessageMeta(
sender=msg.get("From", ""),
sender=str(msg.get("From", "")),
recipients=_import_address_headers(msg.get_all("To", [])),
cc=_import_address_headers(msg.get_all("Cc", [])),
bcc=_import_address_headers(msg.get_all("Bcc", [])),
subject=msg.get("Subject"), # TODO: Remove newlines
id=msg.get("Message-ID", None),
subject=_header_to_str(msg.get("Subject")),
id=_header_to_str(msg.get("Message-ID", None)),
)
timestamp: str | None = None
timestamp_date = msg.get("Date", None)
Expand Down Expand Up @@ -175,7 +223,13 @@ def _decode_email_payload(part: Message) -> str:
return payload
return ""
if isinstance(payload, bytes):
return payload.decode(part.get_content_charset() or "utf-8", errors="replace")
charset = part.get_content_charset() or "latin-1"
try:
return payload.decode(charset, errors="replace")
except LookupError:
# Unknown encoding (e.g. iso-8859-8-i); fall back to latin-1
# which accepts all 256 byte values without loss.
return payload.decode("latin-1")
if isinstance(payload, str):
return payload
return ""
Expand All @@ -187,7 +241,7 @@ def _import_address_headers(headers: list[str]) -> list[str]:
unique_addresses: set[str] = set()
for header in headers:
if header:
addresses = _remove_empty_strings(header.split(","))
addresses = _remove_empty_strings(str(header).split(","))
for address in addresses:
unique_addresses.add(address)

Expand Down
Loading