diff --git a/app/config.py b/app/config.py index 64ae82b6e..5dc0baeea 100644 --- a/app/config.py +++ b/app/config.py @@ -686,3 +686,6 @@ def read_partner_dict(var: str) -> dict[int, str]: ALIAS_TRASH_DAYS = int(os.environ.get("ALIAS_TRASH_DAYS", 30)) ALLOWED_OAUTH_SCHEMES = get_env_csv("ALLOWED_OAUTH_SCHEMES", "auth.simplelogin,https") MAX_EMAIL_FORWARD_RECIPIENTS = int(os.environ.get("MAX_EMAIL_FORWARD_RECIPIENTS", 30)) +MAX_CONTACTS_TO_CREATE_FOR_FORWARD = int( + os.environ.get("MAX_CONTACTS_TO_CREATE_FOR_FORWARD", 30) +) diff --git a/app/email/forward_replacements.py b/app/email/forward_replacements.py new file mode 100644 index 000000000..166bad16b --- /dev/null +++ b/app/email/forward_replacements.py @@ -0,0 +1,237 @@ +from dataclasses import dataclass +from email.message import Message +from itertools import batched +from typing import List + +from email_validator import validate_email, EmailNotValidError +from flanker.addresslib import address +from flanker.addresslib.address import EmailAddress +from sqlalchemy import and_ +from sqlalchemy.exc import IntegrityError + +from app.db import Session +from app.email import headers +from app.email_utils import ( + get_header_unicode, + add_or_replace_header, + delete_header, + generate_reply_email, +) +from app.log import LOG +from app.models import Alias, Contact +from app.utils import sanitize_email + +_IN_BATCH_SIZE = 50 + + +@dataclass +class MessageContacts: + existing: dict[str, Contact] + non_existing_to: set[EmailAddress] + non_existing_cc: set[EmailAddress] + + +@dataclass +class HeaderReplacement: + header: str + replacement: str + + +@dataclass +class Replacements: + contacts_to_create: List[Contact] + contacts_to_update: List[Contact] + headers_to_delete: List[str] + headers_to_replace: List[HeaderReplacement] + + def __init__(self): + self.contacts_to_create = [] + self.headers_to_delete = [] + self.contacts_to_update = [] + self.headers_to_replace = [] + + +def _get_addresses_for_headers( + msg: Message, message_headers: List[str] +) -> dict[str, set[EmailAddress]]: + addresses: dict[str, set[EmailAddress]] = {h: set() for h in message_headers} + for header in message_headers: + header_value = msg.get_all(header, []) + header_value = [get_header_unicode(h) for h in header_value] + + for value in header_value: + for parsed in address.parse_list(value): + addresses[header].add(parsed) + + return addresses + + +def _contacts_for_message(msg: Message, alias: Alias) -> MessageContacts: + addresses = _get_addresses_for_headers(msg, [headers.TO, headers.CC]) + + to_addresses = addresses[headers.TO] + cc_addresses = addresses[headers.CC] + + all_addresses_set = set() + all_addresses_set.update(to_addresses) + all_addresses_set.update(cc_addresses) + all_addresses = list(all_addresses_set) + + existing_contacts: dict[str, Contact] = {} + non_existing_cc: set[EmailAddress] = set() + non_existing_to: set[EmailAddress] = set() + for chunk in batched(all_addresses, _IN_BATCH_SIZE): + chunk_addresses: List[EmailAddress] = [add.address for add in chunk] + chunk_contacts = Contact.filter( + and_( + Contact.alias_id == alias.id, Contact.website_email.in_(chunk_addresses) + ) + ).all() + + for contact in chunk_contacts: + existing_contacts[contact.email] = contact + + if len(chunk_addresses) != len(chunk_contacts): + # Check which ones are missing + for chunk_address in chunk_addresses: + if chunk_address not in existing_contacts: + if chunk_address in to_addresses: + non_existing_to.add(chunk_address) + elif chunk_address in cc_addresses: + non_existing_cc.add(chunk_address) + + return MessageContacts( + existing=existing_contacts, + non_existing_to=non_existing_to, + non_existing_cc=non_existing_cc, + ) + + +def _calculate_replacements_for_header( + msg: Message, + alias: Alias, + header: str, + contacts: dict[str, Contact], + replacements: Replacements, +): + """ + Replace CC or To header by Reply emails in forward phase + """ + new_addrs: [str] = [] + headers = msg.get_all(header, []) + # headers can be an array of Header, convert it to string here + headers = [get_header_unicode(h) for h in headers] + + full_addresses: [EmailAddress] = [] + for h in headers: + full_addresses += address.parse_list(h) + + for full_address in full_addresses: + contact_email = sanitize_email(full_address.address, not_lower=True) + + # no transformation when alias is already in the header + if contact_email.lower() == alias.email: + new_addrs.append(full_address.full_spec()) + continue + + try: + # NOT allow unicode for contact address + validate_email( + contact_email, check_deliverability=False, allow_smtputf8=False + ) + except EmailNotValidError: + LOG.w("invalid contact email %s. %s. Skip", contact_email, headers) + continue + + contact_name = full_address.display_name + if len(contact_name) >= Contact.MAX_NAME_LENGTH: + contact_name = contact_name[0 : Contact.MAX_NAME_LENGTH] + + contact = contacts.get(contact_email, None) + if contact: + # update the contact name if needed + if contact.name != full_address.display_name: + LOG.d( + "Update contact %s name %s to %s", + contact, + contact.name, + contact_name, + ) + contact.name = contact_name + replacements.contacts_to_update.append(contact) + else: + LOG.d( + "create contact for alias %s and email %s, header %s", + alias, + contact_email, + header, + ) + + try: + contact = Contact.create( + user_id=alias.user_id, + alias_id=alias.id, + website_email=contact_email, + name=contact_name, + reply_email=generate_reply_email(contact_email, alias), + is_cc=header.lower() == "cc", + automatic_created=True, + ) + replacements.contacts_to_create.append(contact) + except IntegrityError: + LOG.w("Contact %s %s already exist", alias, contact_email) + Session.rollback() + contact = Contact.get_by(alias_id=alias.id, website_email=contact_email) + + new_addrs.append(contact.new_addr()) + + if new_addrs: + new_header = ",".join(new_addrs) + LOG.d("Replace %s header, old: %s, new: %s", header, msg[header], new_header) + replacements.headers_to_replace.append( + HeaderReplacement(header=header, replacement=new_header) + ) + else: + LOG.d("Delete %s header, old value %s", header, msg[header]) + replacements.headers_to_delete.append(header) + + +def calculate_forward_replacements( + message: Message, alias: Alias, contacts: dict[str, Contact] +) -> Replacements: + replacements = Replacements() + _calculate_replacements_for_header( + message, alias, headers.TO, contacts, replacements + ) + _calculate_replacements_for_header( + message, alias, headers.CC, contacts, replacements + ) + return replacements + + +def replace_headers_when_forward( + message: Message, alias: Alias, max_contacts_to_create_limit: int +) -> bool: + contacts = _contacts_for_message(message, alias) + + total_contacts_to_create = len(contacts.non_existing_to) + len( + contacts.non_existing_cc + ) + if total_contacts_to_create > max_contacts_to_create_limit: + LOG.i( + f"Would have tried to create {total_contacts_to_create} contacts, but only {max_contacts_to_create_limit} allowed" + ) + return False + + replacements = calculate_forward_replacements(message, alias, contacts.existing) + + if len(replacements.contacts_to_create) > max_contacts_to_create_limit: + return False + + for replacement in replacements.headers_to_replace: + add_or_replace_header(message, replacement.header, replacement.replacement) + + for header in replacements.headers_to_delete: + delete_header(message, header) + + return True diff --git a/app/models.py b/app/models.py index d517bd7ee..50bacfa88 100644 --- a/app/models.py +++ b/app/models.py @@ -2117,7 +2117,7 @@ def new_addr(self): And return new address with RFC 2047 format """ user = self.user - sender_format = user.sender_format if user else SenderFormatEnum.AT.value + sender_format = int(user.sender_format) if user else SenderFormatEnum.AT.value if sender_format == SenderFormatEnum.NO_NAME.value: return self.reply_email diff --git a/email_handler.py b/email_handler.py index 123062048..3022d71c3 100644 --- a/email_handler.py +++ b/email_handler.py @@ -48,9 +48,6 @@ import newrelic.agent from aiosmtpd.controller import Controller from aiosmtpd.smtp import Envelope -from email_validator import validate_email, EmailNotValidError -from flanker.addresslib import address -from flanker.addresslib.address import EmailAddress from sqlalchemy.exc import IntegrityError from app import pgp_utils, s3, config, contact_utils @@ -87,10 +84,12 @@ ALERT_FROM_ADDRESS_IS_REVERSE_ALIAS, ALERT_TO_NOREPLY, MAX_EMAIL_FORWARD_RECIPIENTS, + MAX_CONTACTS_TO_CREATE_FOR_FORWARD, ) from app.db import Session from app.email import status, headers from app.email.checks import check_recipient_limit +from app.email.forward_replacements import replace_headers_when_forward from app.email.rate_limit import rate_limited from app.email.spam import get_spam_score from app.email_utils import ( @@ -111,7 +110,6 @@ should_add_dkim_signature, add_header, get_header_unicode, - generate_reply_email, is_reverse_alias, replace, should_disable, @@ -245,87 +243,6 @@ def get_or_create_reply_to_contact( return contact_utils.create_contact(contact_address, alias, contact_name).contact -def replace_header_when_forward(msg: Message, alias: Alias, header: str): - """ - Replace CC or To header by Reply emails in forward phase - """ - new_addrs: [str] = [] - headers = msg.get_all(header, []) - # headers can be an array of Header, convert it to string here - headers = [get_header_unicode(h) for h in headers] - - full_addresses: [EmailAddress] = [] - for h in headers: - full_addresses += address.parse_list(h) - - for full_address in full_addresses: - contact_email = sanitize_email(full_address.address, not_lower=True) - - # no transformation when alias is already in the header - if contact_email.lower() == alias.email: - new_addrs.append(full_address.full_spec()) - continue - - try: - # NOT allow unicode for contact address - validate_email( - contact_email, check_deliverability=False, allow_smtputf8=False - ) - except EmailNotValidError: - LOG.w("invalid contact email %s. %s. Skip", contact_email, headers) - continue - - contact = Contact.get_by(alias_id=alias.id, website_email=contact_email) - contact_name = full_address.display_name - if len(contact_name) >= Contact.MAX_NAME_LENGTH: - contact_name = contact_name[0 : Contact.MAX_NAME_LENGTH] - - if contact: - # update the contact name if needed - if contact.name != full_address.display_name: - LOG.d( - "Update contact %s name %s to %s", - contact, - contact.name, - contact_name, - ) - contact.name = contact_name - Session.commit() - else: - LOG.d( - "create contact for alias %s and email %s, header %s", - alias, - contact_email, - header, - ) - - try: - contact = Contact.create( - user_id=alias.user_id, - alias_id=alias.id, - website_email=contact_email, - name=contact_name, - reply_email=generate_reply_email(contact_email, alias), - is_cc=header.lower() == "cc", - automatic_created=True, - ) - Session.commit() - except IntegrityError: - LOG.w("Contact %s %s already exist", alias, contact_email) - Session.rollback() - contact = Contact.get_by(alias_id=alias.id, website_email=contact_email) - - new_addrs.append(contact.new_addr()) - - if new_addrs: - new_header = ",".join(new_addrs) - LOG.d("Replace %s header, old: %s, new: %s", header, msg[header], new_header) - add_or_replace_header(msg, header, new_header) - else: - LOG.d("Delete %s header, old value %s", header, msg[header]) - delete_header(msg, header) - - def add_alias_to_header_if_needed(msg, alias): """ During the forward phase, add alias to To: header if it isn't included in To and Cc header @@ -919,8 +836,11 @@ def forward_email_to_mailbox( # replace CC & To emails by reverse-alias for all emails that are not alias try: - replace_header_when_forward(msg, alias, headers.CC) - replace_header_when_forward(msg, alias, headers.TO) + if not replace_headers_when_forward( + msg, alias, MAX_CONTACTS_TO_CREATE_FOR_FORWARD + ): + Session.rollback() + return False, status.E526 except CannotCreateContactForReverseAlias: LOG.d("CannotCreateContactForReverseAlias error, delete %s", email_log) EmailLog.delete(email_log.id) diff --git a/tests/email_tests/test_forward_replacements.py b/tests/email_tests/test_forward_replacements.py new file mode 100644 index 000000000..af2c13bcc --- /dev/null +++ b/tests/email_tests/test_forward_replacements.py @@ -0,0 +1,162 @@ +from email.message import Message +from typing import List, Optional + + +from app.db import Session +from app.email import headers +from app.email.forward_replacements import replace_headers_when_forward +from app.email_utils import generate_reply_email +from app.models import Alias, Contact +from app.utils import random_string +from tests.utils import create_new_user, random_email + + +def _emails_to_header(emails: List[str]) -> str: + return ", ".join(emails) + + +def _email_list(size: int, initial: Optional[List[str]] = None) -> str: + emails = initial or [] + for i in range(size): + emails.append(random_email()) + + return _emails_to_header(emails) + + +def _contacts_for_alias(alias: Alias) -> List[Contact]: + return Contact.filter_by(alias_id=alias.id).order_by(Contact.id.asc()).all() + + +def _create_contact_for_alias(email: str, name: str, alias: Alias) -> Contact: + return Contact.create( + user_id=alias.user_id, + alias_id=alias.id, + website_email=email, + name=name, + reply_email=generate_reply_email(email, alias), + is_cc=False, + automatic_created=True, + ) + + +def test_does_nothing_on_empty_message(): + user = create_new_user() + alias = Alias.create_new_random(user) + res = replace_headers_when_forward(Message(), alias, 10) + assert res is True + + +def test_does_nothing_with_empty_to_and_cc(): + user = create_new_user() + alias = Alias.create_new_random(user) + message = Message() + message[headers.TO] = _email_list(0) + message[headers.CC] = _email_list(0) + + res = replace_headers_when_forward(message, alias, 10) + assert res is True + + +def test_does_not_create_contacts_if_already_exist(): + user = create_new_user() + alias = Alias.create_new_random(user) + + contact_email1 = random_email() + contact_name_1 = random_string() + contact_email2 = random_email() + contact_name_2 = random_string() + existing_contact1 = _create_contact_for_alias( + email=contact_email1, name=contact_name_1, alias=alias + ) + existing_contact2 = _create_contact_for_alias( + email=contact_email2, name=contact_name_2, alias=alias + ) + + message_to = _email_list(0, [contact_email1]) + message_cc = _email_list(0, [contact_email2]) + + message = Message() + message[headers.TO] = message_to + message[headers.CC] = message_cc + + assert len(_contacts_for_alias(alias)) == 2 + + res = replace_headers_when_forward(message, alias, 10) + assert res is True + Session.commit() + + # Assert no new contacts have been created + contacts_for_alias = _contacts_for_alias(alias) + assert len(contacts_for_alias) == 2 + assert contacts_for_alias[0].id == existing_contact1.id + assert contacts_for_alias[1].id == existing_contact2.id + + # Assert headers + assert message[headers.TO] == existing_contact1.new_addr() + assert message[headers.CC] == existing_contact2.new_addr() + + +def test_only_creates_contacts_that_did_not_exist(): + user = create_new_user() + alias = Alias.create_new_random(user) + + contact_email = random_email() + contact_name = random_string() + existing_contact = _create_contact_for_alias( + email=contact_email, name=contact_name, alias=alias + ) + + new_email = random_email() + message_to = _emails_to_header([contact_email, new_email]) + + message = Message() + message[headers.TO] = message_to + message[headers.CC] = _emails_to_header([]) + + assert len(_contacts_for_alias(alias)) == 1 + + res = replace_headers_when_forward(message, alias, 10) + assert res is True + Session.commit() + + # Assert 1 new contact has been created + contacts_for_alias = _contacts_for_alias(alias) + assert len(contacts_for_alias) == 2 + assert contacts_for_alias[0].id == existing_contact.id + assert contacts_for_alias[1].website_email == new_email + + # Assert headers + assert message[headers.TO] == ",".join( + [contacts_for_alias[0].new_addr(), contacts_for_alias[1].new_addr()] + ) + + # Empty header gets removed + assert message[headers.CC] is None + + +def test_cannot_create_more_contacts_than_allowed(): + user = create_new_user() + alias = Alias.create_new_random(user) + + contact_email = random_email() + contact_name = random_string() + _existing_contact = _create_contact_for_alias( + email=contact_email, name=contact_name, alias=alias + ) + + contacts_to_create = 2 + max_contacts_to_create_limit = 1 + + message_to = _emails_to_header( + [contact_email, *[random_email() for _ in range(contacts_to_create)]] + ) + + message = Message() + message[headers.TO] = message_to + message[headers.CC] = _emails_to_header([]) + + assert len(_contacts_for_alias(alias)) == 1 + + # Would try to create 2 but can only create 1 + res = replace_headers_when_forward(message, alias, max_contacts_to_create_limit) + assert res is False