-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathemail_alert.py
194 lines (164 loc) · 7.03 KB
/
email_alert.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
#!/usr/local/bin/python3
"""Superclass for email pub alerts.
"""
# import ssl
import inspect
import sys
import getpass
import imaplib # Email protocol
import base64
import quopri
import re
import alert
# Define constants that should be defined for all subclasses.
IS_EMAIL_SOURCE = True # versus, say RSS
SENDERS = None # list of email addresses
SOURCE_NAME_TEXT = None # eg "ScienceDirect Email
# nasty IMAP bits
HEADER_PARTS = (
"(BODY.PEEK[HEADER.FIELDS (From Subject Content-Transfer-Encoding)])")
BODY_PARTS = "(BODY.PEEK[TEXT])"
ENCODING_RE = re.compile(rb'Content-Transfer-Encoding: ([\w-]+)')
class Email(object):
"""
Abstraction of an IMAP email.
"""
def __init__(self, header, body):
self.header = header
self.body = body
header_sender_subject = self.header[0][1].decode("utf-8")
header_lines = header_sender_subject.split("\r\n")
self.encoding = None
for line in header_lines:
if line[0:6] == "From: ":
self.sender = line[6:]
elif line[0:9] == "Subject: ":
self.subject = line[9:]
elif line[0:27] == "Content-Transfer-Encoding: ":
self.encoding = line[27:]
if self.encoding == None:
# sometimes encoding is stored in the body for multi-part messages
#
# body is at [0][1]. Use first encoding we find
self.encoding = ENCODING_RE.search(
self.body[0][1]).group(1).decode("utf-8")
# Decode email body before returning it
if self.encoding == "base64":
self.body_text = base64.standard_b64decode(self.body[0][1])
elif self.encoding in ["quoted-printable", "binary"]:
# Binary appears in NCBI emails, but they lie, I think
self.body_text = str(quopri.decodestring(
self.body[0][1]).decode("utf-8"))
# TODO: Need to get UTF encoding from email header as well.
elif self.encoding in ["7bit", "8bit"]:
self.body_text = self.body[0][1].decode("utf-8")
else:
print(
"ERROR: Unrecognized Content-Transfer-Encoding: "
+ "{0}".format(line), file=sys.stderr)
print(" for email with subject: {0}".format(
self.subject))
return(None)
class EmailAlert(alert.Alert):
"""
Email Alert!
All kinds of email alerts subclass this.
"""
def __init__(self):
"""Init method for EmailAlerts. Mainly documents attributes,
but also sets some defaults.
"""
self._alert = None # Alert
self.pub_alerts = None # PubAlert. List generated from this alert
self.search = None # str. Search alert is for.
self._email_body_text = None # str. body of email.
self.warn_if_empty = True # issue warning if no pubs in alert.
return(None)
def get_search_text_with_alert_source(self):
"""
Return text / name of search, with leading text identifying where
the alert came from
"""
return inspect.getmodule(self).SOURCE_NAME_TEXT + ": " + self.search
class AlertSource(alert.AlertSource):
"""Source that is email alerts."""
def __init__(self, account, imaphost):
"""Given an email account, the and IMAP host for it, open a
connection to that account.
"""
# all pub_alerts from this source
self.module = None
# context = ssl.create_default_context()
self._connection = imaplib.IMAP4_SSL(imaphost) # ,ssl_context=context)
self._connection.login(account, getpass.getpass())
self._current_email_alerts = [] # TODO: May not need this.
self._current_pub_alerts = []
self._msg_nums = None
return(None)
def get_pub_alerts(self, senders, mailbox, since, before):
"""
Given the name of a mailbox, an array of sender email addresses,
a start date, and an end date, return all the pub_alerts from that
source.
Senders is an array because providers change the sending email
address sometimes. Using all known email addresses, instead of
just the latest one, allows us to scan as far back as we can.
"""
# Get each email / alert
self._current_email_alerts = [] # TODO: May not need this.
self._current_pub_alerts = []
for sender in senders:
search_string = _build_imap_search_string(sender, since, before)
self._connection.select(mailbox, True)
typ, self._msg_nums = self._connection.uid(
'search', None, search_string)
# _msg_nums is a list of email numbers
for msg_num in self._msg_nums[0].split():
typ, header = self._connection.uid(
"fetch", msg_num, HEADER_PARTS)
typ, body = self._connection.uid("fetch", msg_num, BODY_PARTS)
email = Email(header, body)
# Email alerts can have different versions.
# Detect which version this is and then invoke the correct
# constructor for the version.
alert_class = self.module.sniff_class_for_alert(email)
email_alert = alert_class(email)
# email_alert = self.module.EmailAlert(email)
self._current_email_alerts.append(email_alert)
# Within each email / alert, generate a pub_alert for each pub.
# each email can contain 0, 1, or more pub_alerts
pub_alerts_in_email = len(email_alert.pub_alerts)
if pub_alerts_in_email:
self._current_pub_alerts += email_alert.pub_alerts
elif email_alert.warn_if_empty:
print("Warning: Alert for search", file=sys.stderr)
print(
" '" + email_alert.search + "'",
file=sys.stderr)
print(
" from source '" + self.module.SOURCE_NAME_TEXT
+ "' does not contain any papers.\n",
file=sys.stderr)
if len(self._msg_nums) == 0:
print(
"Warning: No emails were found from "
+ self.module.SOURCE_NAME_TEXT + "\n")
return iter(self._current_pub_alerts)
def _build_imap_search_string(
sender=None,
sentSince=None,
sentBefore=None):
"""Builds an IMAP search string from the given inputs. At least one
search parameter must be provided.
"""
clauses = []
if sentSince:
clauses.append('SENTSINCE ' + sentSince)
if sentBefore:
clauses.append('SENTBEFORE ' + sentBefore)
if sender:
clauses.append('From "' + sender + '"')
if len(clauses) == 0:
raise AssertionError(
"At least one parameter must be passed to IMAP.buildSearchString")
return('(' + " ".join(clauses) + ')')