File: mailocalypse.py

package info (click to toggle)
python-lamson 1.0pre11-1
  • links: PTS
  • area: main
  • in suites: jessie, jessie-kfreebsd, squeeze, wheezy
  • size: 3,508 kB
  • ctags: 1,036
  • sloc: python: 5,772; xml: 177; makefile: 19
file content (108 lines) | stat: -rw-r--r-- 2,788 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import email
from email.header import make_header, decode_header
from string import capwords
import sys
import mailbox


ALL_MAIL = 0
BAD_MAIL = 0


def all_parts(msg):
    parts = [m for m in msg.walk() if m != msg]
    
    if not parts:
        parts = [msg]

    return parts

def collapse_header(header):
    if header.strip().startswith("=?"):
        decoded = decode_header(header)
        converted = (unicode(
            x[0], encoding=x[1] or 'ascii', errors='replace')
            for x in decoded)
        value = u"".join(converted)
    else:
        value = unicode(header, errors='replace')

    return value.encode("utf-8")


def convert_header_insanity(header):
    if header is None: 
        return header
    elif type(header) == list:
        return [collapse_header(h) for h in header]
    else:
        return collapse_header(header)


def encode_header(name, val, charset='utf-8'):
    msg[name] = make_header([(val, charset)]).encode()


def bless_headers(msg):
    # go through every header and convert it to utf-8
    headers = {}

    for h in msg.keys():
        headers[capwords(h, '-')] = convert_header_insanity(msg[h])

    return headers

def dump_headers(headers):
    for h in headers:
        print h, headers[h]

def mail_load_cleanse(msg_file):
    global ALL_MAIL
    global BAD_MAIL

    msg = email.message_from_file(msg_file)
    headers = bless_headers(msg)

    # go through every body and convert it to utf-8
    parts = all_parts(msg)
    bodies = []
    for part in parts:
        guts = part.get_payload(decode=True)
        if part.get_content_maintype() == "text":
            charset = part.get_charsets()[0]
            try:
                if charset:
                    uguts = unicode(guts, part.get_charsets()[0])
                    guts = uguts.encode("utf-8")
                else:
                    guts = guts.encode("utf-8")
            except UnicodeDecodeError, exc:
                print >> sys.stderr, "CONFLICTED CHARSET:", exc, part.get_charsets()
                BAD_MAIL += 1
            except LookupError, exc:
                print >> sys.stderr, "UNKNOWN CHARSET:", exc, part.get_charsets()
                BAD_MAIL += 1
            except Exception, exc:
                print >> sys.stderr, "WEIRDO ERROR", exc, part.get_charsets()
                BAD_MAIL += 1


            ALL_MAIL += 1

mb = None

try:
    mb = mailbox.Maildir(sys.argv[1])
    len(mb)  # need this to make the maildir try to read the directory and fail
except OSError:
    print "NOT A MAILDIR, TRYING MBOX"
    mb = mailbox.mbox(sys.argv[1])

if not mb:
    print "NOT A MAILDIR OR MBOX, SORRY"

for key in mb.keys():
    mail_load_cleanse(mb.get_file(key))

print >> sys.stderr, "ALL", ALL_MAIL
print >> sys.stderr, "BAD", BAD_MAIL