File: test_mpwriter.py

package info (click to toggle)
python-whoosh 2.7.4%2Bgit6-g9134ad92-10
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 3,804 kB
  • sloc: python: 38,552; makefile: 118
file content (277 lines) | stat: -rw-r--r-- 8,467 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
from __future__ import with_statement
import random
from collections import deque

import pytest

from whoosh import fields, query
from whoosh.compat import u, izip, xrange, permutations, text_type
from whoosh.util.numeric import length_to_byte, byte_to_length
from whoosh.util.testing import TempIndex


def check_multi():
    try:
        import multiprocessing
        import multiprocessing.synchronize  # @UnusedImport
    except ImportError:
        pytest.skip()
    else:
        try:
            from multiprocessing import Queue
            Queue()
        except OSError:
            pytest.skip()
        else:
            return False


def _byten(n):
    return byte_to_length(length_to_byte(n))


def _do_basic(writerclass):
    # Create the domain data

    # List of individual words added to the index
    words = []
    # List of string values added to the index
    docs = []
    # A ring buffer for creating string values
    buf = deque()
    for ls in permutations(u("abcd")):
        word = "".join(ls)
        # Remember this word is in the index (to check lexicon)
        words.append(word)

        # Add this word on to the end, pop the first word off to create N word
        # documents where N <= 10
        buf.append(word)
        if len(buf) > 10:
            buf.popleft()
        # Create a copy of the buffer and shuffle it to create a document value
        # and add it to the list of document values
        doc = list(buf)
        random.shuffle(doc)
        docs.append(" ".join(doc))
    # Shuffle the list of document values
    random.shuffle(docs)

    schema = fields.Schema(text=fields.TEXT(stored=True, spelling=True,
                                            vector=True),
                           row=fields.NUMERIC(stored=True))

    with TempIndex(schema, storage_debug=True) as ix:
        # Add the domain data to the index
        with writerclass(ix, procs=3) as w:
            for i, value in enumerate(docs):
                w.add_document(text=value, row=i)

        with ix.searcher() as s:
            r = s.reader()

            # Check the lexicon
            for word, term in izip(words, r.field_terms("text")):
                assert word == term
            # Check the doc count
            assert r.doc_count_all() == len(docs)

            # Check there are lengths
            total = sum(r.doc_field_length(docnum, "text", 0)
                        for docnum in xrange(r.doc_count_all()))
            assert total > 0

            # Check per-doc info
            for i, value in enumerate(docs):
                pieces = value.split()
                docnum = s.document_number(row=i)

                # Check stored value
                sv = r.stored_fields(docnum)
                assert sv["text"] == value

                # Check vectors
                vr = r.vector(docnum, "text")
                # Get the terms and positions from the vector matcher
                iv = list(vr.items_as("positions"))
                # What the vector should look like
                ov = sorted((text, [i]) for i, text in enumerate(pieces))
                assert iv == ov

                # Check field length
                assert r.doc_field_length(docnum, "text") == len(pieces)


def test_basic_serial():
    check_multi()
    from whoosh.multiproc import SerialMpWriter

    _do_basic(SerialMpWriter)


def test_basic_multi():
    check_multi()
    from whoosh.multiproc import MpWriter

    _do_basic(MpWriter)


def test_no_add():
    check_multi()
    from whoosh.multiproc import MpWriter

    schema = fields.Schema(text=fields.TEXT(stored=True, spelling=True,
                                            vector=True))
    with TempIndex(schema) as ix:
        with ix.writer(procs=3) as w:
            assert type(w) == MpWriter


def _do_merge(writerclass):
    schema = fields.Schema(key=fields.ID(stored=True, unique=True),
                           value=fields.TEXT(stored=True, spelling=True,
                                             vector=True))

    domain = {"a": "aa", "b": "bb cc", "c": "cc dd ee", "d": "dd ee ff gg",
              "e": "ee ff gg hh ii", "f": "ff gg hh ii jj kk",
              "g": "gg hh ii jj kk ll mm", "h": "hh ii jj kk ll mm nn oo",
              "i": "ii jj kk ll mm nn oo pp qq ww ww ww ww ww ww",
              "j": "jj kk ll mm nn oo pp qq rr ss",
              "k": "kk ll mm nn oo pp qq rr ss tt uu"}

    with TempIndex(schema) as ix:
        w = ix.writer()
        for key in "abc":
            w.add_document(key=u(key), value=u(domain[key]))
        w.commit()

        w = ix.writer()
        for key in "def":
            w.add_document(key=u(key), value=u(domain[key]))
        w.commit(merge=False)

        w = writerclass(ix, procs=3)
        del domain["b"]
        w.delete_by_term("key", u("b"))

        domain["e"] = "xx yy zz"
        w.update_document(key=u("e"), value=u(domain["e"]))

        for key in "ghijk":
            w.add_document(key=u(key), value=u(domain[key]))
        w.commit(optimize=True)

        assert len(ix._segments()) == 1

        with ix.searcher() as s:
            r = s.reader()

            assert s.doc_count() == len(domain)

            assert "".join(r.field_terms("key")) == "acdefghijk"
            assert " ".join(r.field_terms("value")) == "aa cc dd ee ff gg hh ii jj kk ll mm nn oo pp qq rr ss tt uu ww xx yy zz"

            for key in domain:
                docnum = s.document_number(key=key)
                assert docnum is not None

                length = r.doc_field_length(docnum, "value")
                assert length
                assert _byten(len(domain[key].split())) == length

                sf = r.stored_fields(docnum)
                assert domain[key] == sf["value"]

            words = sorted(set((" ".join(domain.values())).split()))
            assert words == list(r.field_terms("value"))

            for word in words:
                hits = s.search(query.Term("value", word))
                for hit in hits:
                    assert word in hit["value"].split()


def test_merge_serial():
    check_multi()
    from whoosh.multiproc import SerialMpWriter

    _do_merge(SerialMpWriter)


def test_merge_multi():
    check_multi()
    from whoosh.multiproc import MpWriter

    _do_merge(MpWriter)


def test_no_score_no_store():
    check_multi()
    from whoosh.multiproc import MpWriter

    schema = fields.Schema(a=fields.ID, b=fields.KEYWORD)
    domain = {}
    keys = list(u("abcdefghijklmnopqrstuvwx"))
    random.shuffle(keys)
    words = u("alfa bravo charlie delta").split()
    for i, key in enumerate(keys):
        domain[key] = words[i % len(words)]

    with TempIndex(schema) as ix:
        with MpWriter(ix, procs=3) as w:
            for key, value in domain.items():
                w.add_document(a=key, b=value)

        with ix.searcher() as s:
            for word in words:
                r = s.search(query.Term("b", word))
                assert len(r) == 6


def test_multisegment():
    check_multi()
    from whoosh.multiproc import MpWriter

    schema = fields.Schema(a=fields.TEXT(stored=True, spelling=True,
                                         vector=True))
    words = u("alfa bravo charlie delta echo").split()
    with TempIndex(schema) as ix:
        with ix.writer(procs=3, multisegment=True, batchsize=10) as w:
            assert w.__class__ == MpWriter
            assert w.multisegment

            for ls in permutations(words, 3):
                w.add_document(a=u(" ").join(ls))

        assert len(ix._segments()) == 3

        with ix.searcher() as s:
            for word in words:
                r = s.search(query.Term("a", word))
                for hit in r:
                    assert word in hit["a"].split()


def test_batchsize_eq_doccount():
    check_multi()
    schema = fields.Schema(a=fields.KEYWORD(stored=True))
    with TempIndex(schema) as ix:
        with ix.writer(procs=4, batchsize=10) as w:
            for i in xrange(10):
                w.add_document(a=u(str(i)))


def test_finish_segment():
    check_multi()

    from whoosh.multiproc import MpWriter

    schema = fields.Schema(a=fields.KEYWORD(stored=True))
    with TempIndex(schema) as ix:
        w = MpWriter(ix, procs=2, batchsize=1, multisegment=False,
                     limitmb=0.00001)

        for i in range(100):
            w.add_document(a=text_type(i) * 10)

        w.commit()