Skip to content

Commit df78109

Browse files
committed
refactor(email_handler): replace_headers_when_forward
1 parent 0e95f3d commit df78109

File tree

5 files changed

+410
-88
lines changed

5 files changed

+410
-88
lines changed

app/config.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -686,3 +686,6 @@ def read_partner_dict(var: str) -> dict[int, str]:
686686
ALIAS_TRASH_DAYS = int(os.environ.get("ALIAS_TRASH_DAYS", 30))
687687
ALLOWED_OAUTH_SCHEMES = get_env_csv("ALLOWED_OAUTH_SCHEMES", "auth.simplelogin,https")
688688
MAX_EMAIL_FORWARD_RECIPIENTS = int(os.environ.get("MAX_EMAIL_FORWARD_RECIPIENTS", 30))
689+
MAX_CONTACTS_TO_CREATE_FOR_FORWARD = int(
690+
os.environ.get("MAX_CONTACTS_TO_CREATE_FOR_FORWARD", 30)
691+
)

app/email/forward_replacements.py

Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
from dataclasses import dataclass
2+
from email.message import Message
3+
from itertools import batched
4+
from typing import List
5+
6+
from email_validator import validate_email, EmailNotValidError
7+
from flanker.addresslib import address
8+
from flanker.addresslib.address import EmailAddress
9+
from sqlalchemy import and_
10+
from sqlalchemy.exc import IntegrityError
11+
12+
from app.db import Session
13+
from app.email import headers
14+
from app.email_utils import (
15+
get_header_unicode,
16+
add_or_replace_header,
17+
delete_header,
18+
generate_reply_email,
19+
)
20+
from app.log import LOG
21+
from app.models import Alias, Contact
22+
from app.utils import sanitize_email
23+
24+
_IN_BATCH_SIZE = 50
25+
26+
27+
@dataclass
28+
class MessageContacts:
29+
existing: dict[str, Contact]
30+
non_existing_to: set[EmailAddress]
31+
non_existing_cc: set[EmailAddress]
32+
33+
34+
@dataclass
35+
class HeaderReplacement:
36+
header: str
37+
replacement: str
38+
39+
40+
@dataclass
41+
class Replacements:
42+
contacts_to_create: List[Contact]
43+
contacts_to_update: List[Contact]
44+
headers_to_delete: List[str]
45+
headers_to_replace: List[HeaderReplacement]
46+
47+
def __init__(self):
48+
self.contacts_to_create = []
49+
self.headers_to_delete = []
50+
self.contacts_to_update = []
51+
self.headers_to_replace = []
52+
53+
54+
def _get_addresses_for_headers(
55+
msg: Message, message_headers: List[str]
56+
) -> dict[str, set[EmailAddress]]:
57+
addresses: dict[str, set[EmailAddress]] = {h: set() for h in message_headers}
58+
for header in message_headers:
59+
header_value = msg.get_all(header, [])
60+
header_value = [get_header_unicode(h) for h in header_value]
61+
62+
for value in header_value:
63+
for parsed in address.parse_list(value):
64+
addresses[header].add(parsed)
65+
66+
return addresses
67+
68+
69+
def _contacts_for_message(msg: Message, alias: Alias) -> MessageContacts:
70+
addresses = _get_addresses_for_headers(msg, [headers.TO, headers.CC])
71+
72+
to_addresses = addresses[headers.TO]
73+
cc_addresses = addresses[headers.CC]
74+
75+
all_addresses_set = set()
76+
all_addresses_set.update(to_addresses)
77+
all_addresses_set.update(cc_addresses)
78+
all_addresses = list(all_addresses_set)
79+
80+
existing_contacts: dict[str, Contact] = {}
81+
non_existing_cc: set[EmailAddress] = set()
82+
non_existing_to: set[EmailAddress] = set()
83+
for chunk in batched(all_addresses, _IN_BATCH_SIZE):
84+
chunk_addresses: List[EmailAddress] = [add.address for add in chunk]
85+
chunk_contacts = Contact.filter(
86+
and_(
87+
Contact.alias_id == alias.id, Contact.website_email.in_(chunk_addresses)
88+
)
89+
).all()
90+
91+
for contact in chunk_contacts:
92+
existing_contacts[contact.email] = contact
93+
94+
if len(chunk_addresses) != len(chunk_contacts):
95+
# Check which ones are missing
96+
for chunk_address in chunk_addresses:
97+
if chunk_address not in existing_contacts:
98+
if chunk_address in to_addresses:
99+
non_existing_to.add(chunk_address)
100+
elif chunk_address in cc_addresses:
101+
non_existing_cc.add(chunk_address)
102+
103+
return MessageContacts(
104+
existing=existing_contacts,
105+
non_existing_to=non_existing_to,
106+
non_existing_cc=non_existing_cc,
107+
)
108+
109+
110+
def _calculate_replacements_for_header(
111+
msg: Message,
112+
alias: Alias,
113+
header: str,
114+
contacts: dict[str, Contact],
115+
replacements: Replacements,
116+
):
117+
"""
118+
Replace CC or To header by Reply emails in forward phase
119+
"""
120+
new_addrs: [str] = []
121+
headers = msg.get_all(header, [])
122+
# headers can be an array of Header, convert it to string here
123+
headers = [get_header_unicode(h) for h in headers]
124+
125+
full_addresses: [EmailAddress] = []
126+
for h in headers:
127+
full_addresses += address.parse_list(h)
128+
129+
for full_address in full_addresses:
130+
contact_email = sanitize_email(full_address.address, not_lower=True)
131+
132+
# no transformation when alias is already in the header
133+
if contact_email.lower() == alias.email:
134+
new_addrs.append(full_address.full_spec())
135+
continue
136+
137+
try:
138+
# NOT allow unicode for contact address
139+
validate_email(
140+
contact_email, check_deliverability=False, allow_smtputf8=False
141+
)
142+
except EmailNotValidError:
143+
LOG.w("invalid contact email %s. %s. Skip", contact_email, headers)
144+
continue
145+
146+
contact_name = full_address.display_name
147+
if len(contact_name) >= Contact.MAX_NAME_LENGTH:
148+
contact_name = contact_name[0 : Contact.MAX_NAME_LENGTH]
149+
150+
contact = contacts.get(contact_email, None)
151+
if contact:
152+
# update the contact name if needed
153+
if contact.name != full_address.display_name:
154+
LOG.d(
155+
"Update contact %s name %s to %s",
156+
contact,
157+
contact.name,
158+
contact_name,
159+
)
160+
contact.name = contact_name
161+
replacements.contacts_to_update.append(contact)
162+
else:
163+
LOG.d(
164+
"create contact for alias %s and email %s, header %s",
165+
alias,
166+
contact_email,
167+
header,
168+
)
169+
170+
try:
171+
contact = Contact.create(
172+
user_id=alias.user_id,
173+
alias_id=alias.id,
174+
website_email=contact_email,
175+
name=contact_name,
176+
reply_email=generate_reply_email(contact_email, alias),
177+
is_cc=header.lower() == "cc",
178+
automatic_created=True,
179+
)
180+
replacements.contacts_to_create.append(contact)
181+
except IntegrityError:
182+
LOG.w("Contact %s %s already exist", alias, contact_email)
183+
Session.rollback()
184+
contact = Contact.get_by(alias_id=alias.id, website_email=contact_email)
185+
186+
new_addrs.append(contact.new_addr())
187+
188+
if new_addrs:
189+
new_header = ",".join(new_addrs)
190+
LOG.d("Replace %s header, old: %s, new: %s", header, msg[header], new_header)
191+
replacements.headers_to_replace.append(
192+
HeaderReplacement(header=header, replacement=new_header)
193+
)
194+
else:
195+
LOG.d("Delete %s header, old value %s", header, msg[header])
196+
replacements.headers_to_delete.append(header)
197+
198+
199+
def calculate_forward_replacements(
200+
message: Message, alias: Alias, contacts: dict[str, Contact]
201+
) -> Replacements:
202+
replacements = Replacements()
203+
_calculate_replacements_for_header(
204+
message, alias, headers.TO, contacts, replacements
205+
)
206+
_calculate_replacements_for_header(
207+
message, alias, headers.CC, contacts, replacements
208+
)
209+
return replacements
210+
211+
212+
def replace_headers_when_forward(
213+
message: Message, alias: Alias, max_contacts_to_create_limit: int
214+
) -> bool:
215+
contacts = _contacts_for_message(message, alias)
216+
217+
total_contacts_to_create = len(contacts.non_existing_to) + len(
218+
contacts.non_existing_cc
219+
)
220+
if total_contacts_to_create > max_contacts_to_create_limit:
221+
LOG.i(
222+
f"Would have tried to create {total_contacts_to_create} contacts, but only {max_contacts_to_create_limit} allowed"
223+
)
224+
return False
225+
226+
replacements = calculate_forward_replacements(message, alias, contacts.existing)
227+
228+
if len(replacements.contacts_to_create) > max_contacts_to_create_limit:
229+
return False
230+
231+
for replacement in replacements.headers_to_replace:
232+
add_or_replace_header(message, replacement.header, replacement.replacement)
233+
234+
for header in replacements.headers_to_delete:
235+
delete_header(message, header)
236+
237+
return True

app/models.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2117,7 +2117,7 @@ def new_addr(self):
21172117
And return new address with RFC 2047 format
21182118
"""
21192119
user = self.user
2120-
sender_format = user.sender_format if user else SenderFormatEnum.AT.value
2120+
sender_format = int(user.sender_format) if user else SenderFormatEnum.AT.value
21212121

21222122
if sender_format == SenderFormatEnum.NO_NAME.value:
21232123
return self.reply_email

email_handler.py

Lines changed: 7 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,6 @@
4848
import newrelic.agent
4949
from aiosmtpd.controller import Controller
5050
from aiosmtpd.smtp import Envelope
51-
from email_validator import validate_email, EmailNotValidError
52-
from flanker.addresslib import address
53-
from flanker.addresslib.address import EmailAddress
5451
from sqlalchemy.exc import IntegrityError
5552

5653
from app import pgp_utils, s3, config, contact_utils
@@ -87,10 +84,12 @@
8784
ALERT_FROM_ADDRESS_IS_REVERSE_ALIAS,
8885
ALERT_TO_NOREPLY,
8986
MAX_EMAIL_FORWARD_RECIPIENTS,
87+
MAX_CONTACTS_TO_CREATE_FOR_FORWARD,
9088
)
9189
from app.db import Session
9290
from app.email import status, headers
9391
from app.email.checks import check_recipient_limit
92+
from app.email.forward_replacements import replace_headers_when_forward
9493
from app.email.rate_limit import rate_limited
9594
from app.email.spam import get_spam_score
9695
from app.email_utils import (
@@ -111,7 +110,6 @@
111110
should_add_dkim_signature,
112111
add_header,
113112
get_header_unicode,
114-
generate_reply_email,
115113
is_reverse_alias,
116114
replace,
117115
should_disable,
@@ -245,87 +243,6 @@ def get_or_create_reply_to_contact(
245243
return contact_utils.create_contact(contact_address, alias, contact_name).contact
246244

247245

248-
def replace_header_when_forward(msg: Message, alias: Alias, header: str):
249-
"""
250-
Replace CC or To header by Reply emails in forward phase
251-
"""
252-
new_addrs: [str] = []
253-
headers = msg.get_all(header, [])
254-
# headers can be an array of Header, convert it to string here
255-
headers = [get_header_unicode(h) for h in headers]
256-
257-
full_addresses: [EmailAddress] = []
258-
for h in headers:
259-
full_addresses += address.parse_list(h)
260-
261-
for full_address in full_addresses:
262-
contact_email = sanitize_email(full_address.address, not_lower=True)
263-
264-
# no transformation when alias is already in the header
265-
if contact_email.lower() == alias.email:
266-
new_addrs.append(full_address.full_spec())
267-
continue
268-
269-
try:
270-
# NOT allow unicode for contact address
271-
validate_email(
272-
contact_email, check_deliverability=False, allow_smtputf8=False
273-
)
274-
except EmailNotValidError:
275-
LOG.w("invalid contact email %s. %s. Skip", contact_email, headers)
276-
continue
277-
278-
contact = Contact.get_by(alias_id=alias.id, website_email=contact_email)
279-
contact_name = full_address.display_name
280-
if len(contact_name) >= Contact.MAX_NAME_LENGTH:
281-
contact_name = contact_name[0 : Contact.MAX_NAME_LENGTH]
282-
283-
if contact:
284-
# update the contact name if needed
285-
if contact.name != full_address.display_name:
286-
LOG.d(
287-
"Update contact %s name %s to %s",
288-
contact,
289-
contact.name,
290-
contact_name,
291-
)
292-
contact.name = contact_name
293-
Session.commit()
294-
else:
295-
LOG.d(
296-
"create contact for alias %s and email %s, header %s",
297-
alias,
298-
contact_email,
299-
header,
300-
)
301-
302-
try:
303-
contact = Contact.create(
304-
user_id=alias.user_id,
305-
alias_id=alias.id,
306-
website_email=contact_email,
307-
name=contact_name,
308-
reply_email=generate_reply_email(contact_email, alias),
309-
is_cc=header.lower() == "cc",
310-
automatic_created=True,
311-
)
312-
Session.commit()
313-
except IntegrityError:
314-
LOG.w("Contact %s %s already exist", alias, contact_email)
315-
Session.rollback()
316-
contact = Contact.get_by(alias_id=alias.id, website_email=contact_email)
317-
318-
new_addrs.append(contact.new_addr())
319-
320-
if new_addrs:
321-
new_header = ",".join(new_addrs)
322-
LOG.d("Replace %s header, old: %s, new: %s", header, msg[header], new_header)
323-
add_or_replace_header(msg, header, new_header)
324-
else:
325-
LOG.d("Delete %s header, old value %s", header, msg[header])
326-
delete_header(msg, header)
327-
328-
329246
def add_alias_to_header_if_needed(msg, alias):
330247
"""
331248
During the forward phase, add alias to To: header if it isn't included in To and Cc header
@@ -919,8 +836,11 @@ def forward_email_to_mailbox(
919836

920837
# replace CC & To emails by reverse-alias for all emails that are not alias
921838
try:
922-
replace_header_when_forward(msg, alias, headers.CC)
923-
replace_header_when_forward(msg, alias, headers.TO)
839+
if not replace_headers_when_forward(
840+
msg, alias, MAX_CONTACTS_TO_CREATE_FOR_FORWARD
841+
):
842+
Session.rollback()
843+
return False, status.E526
924844
except CannotCreateContactForReverseAlias:
925845
LOG.d("CannotCreateContactForReverseAlias error, delete %s", email_log)
926846
EmailLog.delete(email_log.id)

0 commit comments

Comments
 (0)