Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pytest.local.ini
/evals
/tests/testdata/Episode_53_Answer_results.json
/tests/testdata/Episode_53_Search_results.json
/tests/testdata/email-mbox

# Email demo
/tools/gmail/client_secret.json
Expand All @@ -41,3 +42,4 @@ pytest.local.ini

# Monty Python demo
/examples/testdata/MP

65 changes: 59 additions & 6 deletions src/typeagent/emails/email_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from email.header import decode_header, make_header
from email.message import Message
from email.utils import parsedate_to_datetime
import mailbox
from pathlib import Path
import re
from typing import Iterable
Expand All @@ -20,6 +21,18 @@ def decode_encoded_words(value: str) -> str:
return str(make_header(decode_header(value)))


def _header_to_str(value: object) -> str | None:
"""Coerce an email header value to str or None.

msg.get() can return an email.header.Header object instead of a plain str
when the header contains RFC 2047 encoded words. Pydantic expects str, so
we normalise here.
"""
if value is None:
return None
return str(value)


def import_emails_from_dir(
dir_path: str, max_chunk_length: int = 4096
) -> Iterable[EmailMessage]:
Expand All @@ -41,6 +54,39 @@ def import_email_from_file(
return email


def import_emails_from_mbox(
mbox_path: str, max_chunk_length: int = 4096
) -> Iterable[tuple[int, EmailMessage]]:
"""Import emails from an mbox file.

Args:
mbox_path: Path to the mbox file.
max_chunk_length: Maximum length of each text chunk.

Yields:
Tuples of (message_index, EmailMessage) for each email in the mbox file.
The message_index is 0-based.
"""
mbox = mailbox.mbox(mbox_path)
for i, message in enumerate(mbox):
email = import_email_message(message, max_chunk_length)
email.src_url = f"{mbox_path}:{i}"
yield i, email


def count_emails_in_mbox(mbox_path: str) -> int:
"""Count the number of emails in an mbox file.

Args:
mbox_path: Path to the mbox file.

Returns:
The number of emails in the mbox file.
"""
mbox = mailbox.mbox(mbox_path)
return len(mbox)


# Imports a single email MIME string and returns an EmailMessage object
def import_email_string(
email_string: str, max_chunk_length: int = 4096
Expand All @@ -64,14 +110,16 @@ def import_forwarded_email_string(
# Imports an email.message.Message object and returns an EmailMessage object
# If the message is a reply, returns only the latest response.
def import_email_message(msg: Message, max_chunk_length: int) -> EmailMessage:
# Extract metadata from
# Extract metadata from headers.
# msg.get() can return a Header object instead of str for encoded headers,
# so coerce all values to str.
email_meta = EmailMessageMeta(
sender=msg.get("From", ""),
sender=str(msg.get("From", "")),
recipients=_import_address_headers(msg.get_all("To", [])),
cc=_import_address_headers(msg.get_all("Cc", [])),
bcc=_import_address_headers(msg.get_all("Bcc", [])),
subject=msg.get("Subject"), # TODO: Remove newlines
id=msg.get("Message-ID", None),
subject=_header_to_str(msg.get("Subject")),
id=_header_to_str(msg.get("Message-ID", None)),
)
timestamp: str | None = None
timestamp_date = msg.get("Date", None)
Expand Down Expand Up @@ -175,7 +223,12 @@ def _decode_email_payload(part: Message) -> str:
return payload
return ""
if isinstance(payload, bytes):
return payload.decode(part.get_content_charset() or "utf-8", errors="replace")
charset = part.get_content_charset() or "utf-8"
try:
return payload.decode(charset, errors="replace")
except LookupError:
# Unknown encoding (e.g. iso-8859-8-i); fall back to utf-8
return payload.decode("utf-8", errors="replace")
if isinstance(payload, str):
return payload
return ""
Expand All @@ -187,7 +240,7 @@ def _import_address_headers(headers: list[str]) -> list[str]:
unique_addresses: set[str] = set()
for header in headers:
if header:
addresses = _remove_empty_strings(header.split(","))
addresses = _remove_empty_strings(str(header).split(","))
for address in addresses:
unique_addresses.add(address)

Expand Down
Loading
Loading