1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313
|
#### PATTERN | WEB | IMAP ##########################################################################
# Copyright (c) 2010 University of Antwerp, Belgium
# Author: Tom De Smedt <tom@organisms.be>
# License: BSD (see LICENSE.txt for details).
# http://www.clips.ua.ac.be/pages/pattern
####################################################################################################
import os
import imaplib
import re
import email
import time
try:
MODULE = os.path.dirname(os.path.realpath(__file__))
except:
MODULE = ""
# Import the Cache class from pattern.web so e-mails can be cached locally (faster):
try: from ..cache import cache
except:
try:
import os, sys; sys.path.append(os.path.join(MODULE, ".."))
from cache import cache
except:
try:
from pattern.web.cache import cache
except:
cache = {}
#### STRING FUNCTIONS ##############################################################################
def decode_utf8(string):
""" Returns the given string as a unicode string (if possible).
"""
if isinstance(string, str):
for encoding in (("utf-8",), ("windows-1252",), ("utf-8", "ignore")):
try:
return string.decode(*encoding)
except:
pass
return string
return unicode(string)
def encode_utf8(string):
""" Returns the given string as a Python byte string (if possible).
"""
if isinstance(string, unicode):
try:
return string.encode("utf-8")
except:
return string
return str(string)
#### IMAP4 SSL #####################################################################################
# Fixes an issue in Python 2.5- with memory allocation.
# See: http://bugs.python.org/issue1389051
class IMAP4(imaplib.IMAP4):
pass
class IMAP4_SSL(imaplib.IMAP4_SSL):
def read(self, size):
"""Read 'size' bytes from remote."""
# sslobj.read() sometimes returns < size bytes
chunks = []
read = 0
while read < size:
data = self.sslobj.read(min(size-read, 16384)) # use min() instead of max().
read += len(data)
chunks.append(data)
return ''.join(chunks)
#### MAIL ##########################################################################################
GMAIL = "imap.gmail.com"
DATE, FROM, SUBJECT, BODY, ATTACHMENTS = \
"date", "from", "subject", "body", "attachments"
def _basename(folder):
# [Gmail]/INBOX => inbox
f = folder.replace("[Gmail]/","")
f = f.replace("[Gmail]","")
f = f.replace("Mail", "") # "Sent Mail" alias = "sent".
f = f.replace("INBOX.", "") # "inbox.sent" alias = "sent".
f = f.lower()
f = f.strip()
return f
class MailError(Exception):
pass
class MailServiceError(MailError):
pass
class MailLoginError(MailError):
pass
class MailNotLoggedIn(MailError):
pass
class Mail(object):
def __init__(self, username, password, service=GMAIL, port=993, secure=True):
""" IMAP4 connection to a mailbox. With secure=True, SSL is used.
The standard port for SSL is 993.
The standard port without SSL is 143.
"""
self._username = username
self._password = password
self._host = service
self._port = port
self._secure = secure
self._imap4 = None
self._folders = None
self.login(username, password)
@property
def _id(self):
return "%s:%s@%s:%s" % (self._username, self._password, self._host, self._port)
@property
def imap4(self):
if self._imap4 is None:
raise MailNotLoggedIn
return self._imap4
def login(self, username, password, **kwargs):
""" Signs in to the mail account with the given username and password,
raises a MailLoginError otherwise.
"""
self.logout()
self._secure = kwargs.get("secure", self._secure)
self._imap4 = (self._secure and IMAP4_SSL or IMAP4)(self._host, self._port)
try:
status, response = self._imap4.login(username, password)
except:
raise MailLoginError
if status != "OK":
raise MailLoginError(response)
def logout(self):
""" Signs out of the mail account.
"""
if self._imap4 is not None:
self._imap4.logout()
self._imap4 = None
def __del__(self):
if "_imap4" in self.__dict__:
if self._imap4 is not None:
self._imap4.logout()
self._imap4 = None
@property
def folders(self):
""" A dictionary of (name, MailFolder)-tuples.
Default folders: inbox, trash, spam, receipts, ...
"""
if self._folders is None:
status, response = self.imap4.list()
self._folders = [f.split(" \"")[-1].strip(" \"") for f in response]
self._folders = [(_basename(f), MailFolder(self, f)) for f in self._folders]
self._folders = [(f, o) for f, o in self._folders if f != ""]
self._folders = dict(self._folders)
return self._folders
def __getattr__(self, k):
""" Each folder is accessible as Mail.[name].
"""
if k in self.__dict__:
return self.__dict__[k]
if k in self.folders:
return self.folders[k]
raise AttributeError("'Mail' object has no attribute '%s'" % k)
#--- MAIL FOLDER -----------------------------------------------------------------------------------
def _decode(s, message):
try:
# Decode message Content-Type charset to Unicode.
# If all fails, try Latin-1 (common case).
e = message.get("Content-Type")
e = e.split("charset=")[-1].split(";")[0].strip("\"'").lower()
s = s.decode(e)
except:
try: s = s.decode("utf-8")
except:
try: s = s.decode("latin-1")
except:
pass
return s
class MailFolder(object):
def __init__(self, parent, name):
""" A folder (inbox, spam, trash, ...) in a mailbox.
E-mail messages can be searched and retrieved (including attachments) from a folder.
"""
self._parent = parent
self._name = name
@property
def parent(self):
return self._parent
@property
def name(self):
return _basename(self._name)
@property
def count(self):
return len(self)
def search(self, q, field=FROM, cached=False):
""" Returns a list of indices for the given query, latest-first.
The search field can be FROM, DATE or SUBJECT.
"""
id = "mail-%s-%s-%s-%s" % (self.parent._id, self.name, q, field)
if cached and id in cache:
status, response = "OK", [cache[id]]
else:
status, response = self.parent.imap4.select(self._name, readonly=1)
status, response = self.parent.imap4.search(None, field.upper(), q)
if cached:
cache[id] = response[0]
return sorted([int(i)-1 for i in response[0].split()], reverse=True)
def read(self, i, attachments=False, cached=True):
return self.__getitem__(i, attachments, cached)
def __getitem__(self, i, attachments=False, cached=True):
""" Returns the mail message with the given index.
Each message is a dictionary with date, from, subject, body, attachments entries.
The attachments entry is a list of (MIME-type, str)-tuples.
"""
i += 1
id = "mail-%s-%s-%s-%s" % (self.parent._id, self.name, i, attachments)
if cached and id in cache:
m = cache[id]
else:
# Select the current mail folder.
# Get the e-mail header.
# Get the e-mail body, with or without file attachments.
status, response = self.parent.imap4.select(self._name, readonly=1)
status, response1 = self.parent.imap4.fetch(str(i), '(BODY.PEEK[HEADER])')
status, response2 = self.parent.imap4.fetch(str(i), '(BODY.PEEK[%s])' % (not attachments and "TEXT" or ""))
time.sleep(0.1)
m = response1[0][1] + response2[0][1]
# Cache the raw message for faster retrieval.
if cached:
cache[id] = m
# Parse the raw message.
m = email.message_from_string(encode_utf8(m))
d = Message([
(DATE, _decode(m.get(DATE), m)),
(FROM, _decode(m.get(FROM), m)),
(SUBJECT, _decode(m.get(SUBJECT), m)),
(BODY, ""),
(ATTACHMENTS, [])])
# Message body can be a list of parts, including file attachments.
for p in (m.is_multipart() and m.get_payload() or [m]):
if p.get_content_type() == "text/plain":
d[BODY] += _decode(p.get_payload(decode=True), p)
elif attachments:
d[ATTACHMENTS].append((p.get_content_type(), p.get_payload()))
for k in d:
if isinstance(d[k], basestring):
d[k] = d[k].strip()
d[k] = d[k].replace("\r\n", "\n")
return d
def __iter__(self):
""" Returns an iterator over all the messages in the folder, latest-first.
"""
for i in reversed(range(len(self))):
yield self[i]
def __len__(self):
status, response = self.parent.imap4.select(self.name, readonly=1)
return int(response[0])
def __repr__(self):
return "MailFolder(name=%s)" % repr(self.name)
#--- MAIL MESSAGE ----------------------------------------------------------------------------------
class Message(dict):
@property
def author(self):
return self.get(FROM, None)
@property
def date(self):
return self.get(DATE, None)
@property
def subject(self):
return self.get(SUBJECT, "")
@property
def body(self):
return self.get(BODY, "")
@property
def attachments(self):
return self.get(ATTACHMENTS, [])
@property
def email_address(self):
m = re.search(r"<(.*?)>", self.author)
return m and m.group(1) or ""
def __repr__(self):
return "Message(from=%s, subject=%s)" % (
repr(self.author),
repr(self.subject))
|