File: enron.py

package info (click to toggle)
python-whoosh 2.7.4%2Bgit6-g9134ad92-5
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 3,656 kB
  • sloc: python: 38,517; makefile: 118
file content (185 lines) | stat: -rw-r--r-- 6,452 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
from __future__ import division
import os.path, tarfile
from email import message_from_string
from marshal import dump, load
from zlib import compress, decompress

try:
    import xappy
except ImportError:
    pass

from whoosh import analysis, fields
from whoosh.compat import urlretrieve, next
from whoosh.support.bench import Bench, Spec
from whoosh.util import now


# Benchmark class

class Enron(Spec):
    name = "enron"

    enron_archive_url = "http://www.cs.cmu.edu/~enron/enron_mail_082109.tar.gz"
    enron_archive_filename = "enron_mail_082109.tar.gz"
    cache_filename = "enron_cache.pickle"

    header_to_field = {"Date": "date", "From": "frm", "To": "to",
                   "Subject": "subject", "Cc": "cc", "Bcc": "bcc"}

    main_field = "body"
    headline_field = "subject"

    field_order = ("subject", "date", "from", "to", "cc", "bcc", "body")

    cachefile = None

    # Functions for downloading and then reading the email archive and caching
    # the messages in an easier-to-digest format

    def download_archive(self, archive):
        print("Downloading Enron email archive to %r..." % archive)
        t = now()
        urlretrieve(self.enron_archive_url, archive)
        print("Downloaded in ", now() - t, "seconds")

    @staticmethod
    def get_texts(archive):
        archive = tarfile.open(archive, "r:gz")
        while True:
            entry = next(archive)
            archive.members = []
            if entry is None:
                break
            f = archive.extractfile(entry)
            if f is not None:
                text = f.read()
                yield text

    @staticmethod
    def get_messages(archive, headers=True):
        header_to_field = Enron.header_to_field
        for text in Enron.get_texts(archive):
            message = message_from_string(text)
            body = message.as_string().decode("latin_1")
            blank = body.find("\n\n")
            if blank > -1:
                body = body[blank+2:]
            d = {"body": body}
            if headers:
                for k in message.keys():
                    fn = header_to_field.get(k)
                    if not fn: continue
                    v = message.get(k).strip()
                    if v:
                        d[fn] = v.decode("latin_1")
            yield d

    def cache_messages(self, archive, cache):
        print("Caching messages in %s..." % cache)

        if not os.path.exists(archive):
            raise Exception("Archive file %r does not exist" % archive)

        t = now()
        f = open(cache, "wb")
        c = 0
        for d in self.get_messages(archive):
            c += 1
            dump(d, f)
            if not c % 1000: print(c)
        f.close()
        print("Cached messages in ", now() - t, "seconds")

    def setup(self):
        archive = os.path.abspath(os.path.join(self.options.dir, self.enron_archive_filename))
        cache = os.path.abspath(os.path.join(self.options.dir, self.cache_filename))

        if not os.path.exists(archive):
            self.download_archive(archive)
        else:
            print("Archive is OK")

        if not os.path.exists(cache):
            self.cache_messages(archive, cache)
        else:
            print("Cache is OK")

    def documents(self):
        if not os.path.exists(self.cache_filename):
            raise Exception("Message cache does not exist, use --setup")

        f = open(self.cache_filename, "rb")
        try:
            while True:
                self.filepos = f.tell()
                d = load(f)
                yield d
        except EOFError:
            pass
        f.close()

    def whoosh_schema(self):
        ana = analysis.StemmingAnalyzer(maxsize=40, cachesize=None)
        storebody = self.options.storebody
        schema = fields.Schema(body=fields.TEXT(analyzer=ana, stored=storebody),
                               filepos=fields.STORED,
                               date=fields.ID(stored=True),
                               frm=fields.ID(stored=True),
                               to=fields.IDLIST(stored=True),
                               subject=fields.TEXT(stored=True),
                               cc=fields.IDLIST,
                               bcc=fields.IDLIST)
        return schema

    def xappy_indexer_connection(self, path):
        conn = xappy.IndexerConnection(path)
        conn.add_field_action('body', xappy.FieldActions.INDEX_FREETEXT, language='en')
        if self.options.storebody:
            conn.add_field_action('body', xappy.FieldActions.STORE_CONTENT)
        conn.add_field_action('date', xappy.FieldActions.INDEX_EXACT)
        conn.add_field_action('date', xappy.FieldActions.STORE_CONTENT)
        conn.add_field_action('frm', xappy.FieldActions.INDEX_EXACT)
        conn.add_field_action('frm', xappy.FieldActions.STORE_CONTENT)
        conn.add_field_action('to', xappy.FieldActions.INDEX_EXACT)
        conn.add_field_action('to', xappy.FieldActions.STORE_CONTENT)
        conn.add_field_action('subject', xappy.FieldActions.INDEX_FREETEXT, language='en')
        conn.add_field_action('subject', xappy.FieldActions.STORE_CONTENT)
        conn.add_field_action('cc', xappy.FieldActions.INDEX_EXACT)
        conn.add_field_action('bcc', xappy.FieldActions.INDEX_EXACT)
        return conn

    def zcatalog_setup(self, cat):
        from zcatalog import indexes
        for name in ("date", "frm"):
            cat[name] = indexes.FieldIndex(field_name=name)
        for name in ("to", "subject", "cc", "bcc", "body"):
            cat[name] = indexes.TextIndex(field_name=name)

    def process_document_whoosh(self, d):
        d["filepos"] = self.filepos
        if self.options.storebody:
            mf = self.main_field
            d["_stored_%s" % mf] = compress(d[mf], 9)

    def process_result_whoosh(self, d):
        mf = self.main_field
        if mf in d:
            d.fields()[mf] = decompress(d[mf])
        else:
            if not self.cachefile:
                self.cachefile = open(self.cache_filename, "rb")
            filepos = d["filepos"]
            self.cachefile.seek(filepos)
            dd = load(self.cachefile)
            d.fields()[mf] = dd[mf]
        return d

    def process_document_xapian(self, d):
        d[self.main_field] = " ".join([d.get(name, "") for name
                                       in self.field_order])



if __name__=="__main__":
    Bench().run(Enron)