File: __init__.py

package info (click to toggle)
python-pattern 2.6%2Bgit20150109-3
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 78,672 kB
  • sloc: python: 53,865; xml: 11,965; ansic: 2,318; makefile: 94
file content (313 lines) | stat: -rw-r--r-- 10,519 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
#### PATTERN | WEB | IMAP ##########################################################################
# Copyright (c) 2010 University of Antwerp, Belgium
# Author: Tom De Smedt <tom@organisms.be>
# License: BSD (see LICENSE.txt for details).
# http://www.clips.ua.ac.be/pages/pattern

####################################################################################################

import os
import imaplib
import re
import email
import time

try: 
    MODULE = os.path.dirname(os.path.realpath(__file__))
except:
    MODULE = ""

# Import the Cache class from pattern.web so e-mails can be cached locally (faster):
try: from ..cache import cache
except:
    try: 
        import os, sys; sys.path.append(os.path.join(MODULE, ".."))
        from cache import cache
    except:
        try:
            from pattern.web.cache import cache
        except:
            cache = {}

#### STRING FUNCTIONS ##############################################################################

def decode_utf8(string):
    """ Returns the given string as a unicode string (if possible).
    """
    if isinstance(string, str):
        for encoding in (("utf-8",), ("windows-1252",), ("utf-8", "ignore")):
            try: 
                return string.decode(*encoding)
            except:
                pass
        return string
    return unicode(string)
    
def encode_utf8(string):
    """ Returns the given string as a Python byte string (if possible).
    """
    if isinstance(string, unicode):
        try: 
            return string.encode("utf-8")
        except:
            return string
    return str(string)

#### IMAP4 SSL #####################################################################################
# Fixes an issue in Python 2.5- with memory allocation.
# See: http://bugs.python.org/issue1389051

class IMAP4(imaplib.IMAP4):
    pass

class IMAP4_SSL(imaplib.IMAP4_SSL):
    def read(self, size):
        """Read 'size' bytes from remote."""
        # sslobj.read() sometimes returns < size bytes
        chunks = []
        read = 0
        while read < size:
            data = self.sslobj.read(min(size-read, 16384)) # use min() instead of max().
            read += len(data)
            chunks.append(data)
        return ''.join(chunks)

#### MAIL ##########################################################################################

GMAIL = "imap.gmail.com"

DATE, FROM, SUBJECT, BODY, ATTACHMENTS = \
    "date", "from", "subject", "body", "attachments"
    
def _basename(folder):
    # [Gmail]/INBOX => inbox
    f = folder.replace("[Gmail]/","")
    f = f.replace("[Gmail]","")
    f = f.replace("Mail", "")   # "Sent Mail" alias = "sent".
    f = f.replace("INBOX.", "") # "inbox.sent" alias = "sent".
    f = f.lower()
    f = f.strip()
    return f

class MailError(Exception):
    pass
class MailServiceError(MailError):
    pass
class MailLoginError(MailError):
    pass
class MailNotLoggedIn(MailError):
    pass

class Mail(object):
    
    def __init__(self, username, password, service=GMAIL, port=993, secure=True):
        """ IMAP4 connection to a mailbox. With secure=True, SSL is used. 
            The standard port for SSL is 993.
            The standard port without SSL is 143.
        """
        self._username = username
        self._password = password
        self._host     = service
        self._port     = port
        self._secure   = secure
        self._imap4    = None
        self._folders  = None
        self.login(username, password)

    @property
    def _id(self):
        return "%s:%s@%s:%s" % (self._username, self._password, self._host, self._port)

    @property
    def imap4(self):
        if self._imap4 is None: 
            raise MailNotLoggedIn
        return self._imap4
 
    def login(self, username, password, **kwargs):
        """ Signs in to the mail account with the given username and password,
            raises a MailLoginError otherwise.
        """
        self.logout()
        self._secure = kwargs.get("secure", self._secure)
        self._imap4 = (self._secure and IMAP4_SSL or IMAP4)(self._host, self._port)
        try:
            status, response = self._imap4.login(username, password)
        except:
            raise MailLoginError
        if status != "OK":
            raise MailLoginError(response)
 
    def logout(self):
        """ Signs out of the mail account.
        """
        if self._imap4 is not None:
            self._imap4.logout()
            self._imap4 = None
        
    def __del__(self):
        if "_imap4" in self.__dict__:
            if self._imap4 is not None:
                self._imap4.logout()
                self._imap4 = None
    
    @property
    def folders(self):
        """ A dictionary of (name, MailFolder)-tuples.
            Default folders: inbox, trash, spam, receipts, ...
        """
        if self._folders is None:
            status, response = self.imap4.list()
            self._folders = [f.split(" \"")[-1].strip(" \"") for f in response]
            self._folders = [(_basename(f), MailFolder(self, f)) for f in self._folders]
            self._folders = [(f, o) for f, o in self._folders if f != ""]
            self._folders = dict(self._folders)
        return self._folders
    
    def __getattr__(self, k):
        """ Each folder is accessible as Mail.[name].
        """
        if k in self.__dict__:
            return self.__dict__[k]
        if k in self.folders:
            return self.folders[k]
        raise AttributeError("'Mail' object has no attribute '%s'" % k)

#--- MAIL FOLDER -----------------------------------------------------------------------------------

def _decode(s, message):
    try:
        # Decode message Content-Type charset to Unicode.
        # If all fails, try Latin-1 (common case).
        e = message.get("Content-Type")
        e = e.split("charset=")[-1].split(";")[0].strip("\"'").lower()
        s = s.decode(e)
    except:
        try: s = s.decode("utf-8")
        except:
            try: s = s.decode("latin-1")
            except: 
                pass 
    return s

class MailFolder(object):
    
    def __init__(self, parent, name):
        """ A folder (inbox, spam, trash, ...) in a mailbox.
            E-mail messages can be searched and retrieved (including attachments) from a folder.
        """
        self._parent = parent
        self._name   = name
    
    @property
    def parent(self):
        return self._parent
    
    @property
    def name(self):
        return _basename(self._name)
    
    @property
    def count(self):
        return len(self)

    def search(self, q, field=FROM, cached=False):
        """ Returns a list of indices for the given query, latest-first.
            The search field can be FROM, DATE or SUBJECT.
        """
        id = "mail-%s-%s-%s-%s" % (self.parent._id, self.name, q, field)
        if cached and id in cache:
            status, response = "OK", [cache[id]]
        else:
            status, response = self.parent.imap4.select(self._name, readonly=1)
            status, response = self.parent.imap4.search(None, field.upper(), q)
            if cached:
                cache[id] = response[0]
        return sorted([int(i)-1 for i in response[0].split()], reverse=True)

    def read(self, i, attachments=False, cached=True):
        return self.__getitem__(i, attachments, cached)
    
    def __getitem__(self, i, attachments=False, cached=True):
        """ Returns the mail message with the given index.
            Each message is a dictionary with date, from, subject, body, attachments entries.
            The attachments entry is a list of (MIME-type, str)-tuples.
        """
        i += 1
        id = "mail-%s-%s-%s-%s" % (self.parent._id, self.name, i, attachments)
        if cached and id in cache:
            m = cache[id]
        else:
            # Select the current mail folder.
            # Get the e-mail header.
            # Get the e-mail body, with or without file attachments.
            status, response  = self.parent.imap4.select(self._name, readonly=1)
            status, response1 = self.parent.imap4.fetch(str(i), '(BODY.PEEK[HEADER])')
            status, response2 = self.parent.imap4.fetch(str(i), '(BODY.PEEK[%s])' % (not attachments and "TEXT" or ""))
            time.sleep(0.1)
            m = response1[0][1] + response2[0][1]
            # Cache the raw message for faster retrieval.
            if cached:
                cache[id] = m
        # Parse the raw message.
        m = email.message_from_string(encode_utf8(m))
        d = Message([
                 (DATE, _decode(m.get(DATE), m)),
                 (FROM, _decode(m.get(FROM), m)),
              (SUBJECT, _decode(m.get(SUBJECT), m)),
                 (BODY, ""),
          (ATTACHMENTS, [])])
        # Message body can be a list of parts, including file attachments.
        for p in (m.is_multipart() and m.get_payload() or [m]):
            if p.get_content_type() == "text/plain":
                d[BODY] += _decode(p.get_payload(decode=True), p)
            elif attachments:
                d[ATTACHMENTS].append((p.get_content_type(), p.get_payload()))
        for k in d:
            if isinstance(d[k], basestring):
                d[k] = d[k].strip()
                d[k] = d[k].replace("\r\n", "\n")
        return d
        
    def __iter__(self):
        """ Returns an iterator over all the messages in the folder, latest-first.
        """
        for i in reversed(range(len(self))):
            yield self[i]

    def __len__(self):
        status, response = self.parent.imap4.select(self.name, readonly=1)
        return int(response[0])

    def __repr__(self):
        return "MailFolder(name=%s)" % repr(self.name)

#--- MAIL MESSAGE ----------------------------------------------------------------------------------

class Message(dict):
    
    @property
    def author(self):
        return self.get(FROM, None)
    @property
    def date(self):
        return self.get(DATE, None)
    @property
    def subject(self):
        return self.get(SUBJECT, "")
    @property
    def body(self):
        return self.get(BODY, "")
    @property
    def attachments(self):
        return self.get(ATTACHMENTS, [])

    @property
    def email_address(self):
        m = re.search(r"<(.*?)>", self.author)
        return m and m.group(1) or ""

    def __repr__(self):
        return "Message(from=%s, subject=%s)" % (
            repr(self.author),
            repr(self.subject))