File: test_misc.py

package info (click to toggle)
python-whoosh 2.7.4%2Bgit6-g9134ad92-5
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 3,656 kB
  • sloc: python: 38,517; makefile: 118
file content (161 lines) | stat: -rw-r--r-- 4,819 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
from __future__ import with_statement
import os, threading, time

from whoosh.compat import u
from whoosh.util.filelock import try_for
from whoosh.util.numeric import length_to_byte, byte_to_length
from whoosh.util.testing import TempStorage


def test_now():
    from whoosh.util import now

    t1 = now()
    t2 = now()
    assert t1 <= t2


def test_storage_creation():
    import tempfile, uuid
    from whoosh import fields
    from whoosh.filedb.filestore import FileStorage

    schema = fields.Schema(text=fields.TEXT)
    uid = uuid.uuid4()
    dirpath = os.path.join(tempfile.gettempdir(), str(uid))
    assert not os.path.exists(dirpath)

    st = FileStorage(dirpath)
    st.create()
    assert os.path.exists(dirpath)

    ix = st.create_index(schema)
    with ix.writer() as w:
        w.add_document(text=u("alfa bravo"))
        w.add_document(text=u("bracho charlie"))

    st.destroy()
    assert not os.path.exists(dirpath)


def test_ramstorage():
    from whoosh.filedb.filestore import RamStorage

    st = RamStorage()
    lock = st.lock("test")
    lock.acquire()
    lock.release()


def test_filelock_simple():
    with TempStorage("simplefilelock") as st:
        lock1 = st.lock("testlock")
        lock2 = st.lock("testlock")
        assert lock1 is not lock2

        assert lock1.acquire()
        assert st.file_exists("testlock")
        assert not lock2.acquire()
        lock1.release()
        assert lock2.acquire()
        assert not lock1.acquire()
        lock2.release()


def test_threaded_filelock():
    with TempStorage("threadedfilelock") as st:
        lock1 = st.lock("testlock")
        result = []

        # The thread function tries to acquire the lock and then quits
        def fn():
            lock2 = st.lock("testlock")
            gotit = try_for(lock2.acquire, 1.0, 0.1)
            if gotit:
                result.append(True)
                lock2.release()
        t = threading.Thread(target=fn)

        # Acquire the lock in this thread
        lock1.acquire()
        # Start the other thread trying to acquire the lock
        t.start()
        # Wait for a bit
        time.sleep(0.15)
        # Release the lock
        lock1.release()
        # Wait for the other thread to finish
        t.join()
        # If the other thread got the lock, it should have appended True to the
        # "results" list.
        assert result == [True]


def test_length_byte():
    source = list(range(11))
    xform = [length_to_byte(n) for n in source]
    result = [byte_to_length(n) for n in xform]
    assert source == result


def test_clockface_lru():
    from whoosh.util.cache import clockface_lru_cache

    @clockface_lru_cache(5)
    def test(n):
        return n * 2

    result = [test(n) for n in (1, 2, 3, 4, 5, 4, 3, 2, 10, 1)]
    assert result == [2, 4, 6, 8, 10, 8, 6, 4, 20, 2]
    assert test.cache_info() == (3, 7, 5, 5)
    test.cache_clear()
    assert test.cache_info() == (0, 0, 5, 0)


def test_double_barrel_lru():
    from whoosh.util.cache import lru_cache

    @lru_cache(5)
    def test(n):
        return n * 2

    result = [test(n) for n in (1, 2, 3, 4, 5, 4, 3, 2, 10, 1)]
    assert result == [2, 4, 6, 8, 10, 8, 6, 4, 20, 2]
    # # hits, misses, maxsize and currsize
    # assert test.cache_info() == (4, 6, 5, 5)
    test.cache_clear()
    # assert test.cache_info() == (0, 0, 5, 0)


def test_version_object():
    from whoosh.util.versions import SimpleVersion as sv

    assert sv.parse("1") == sv(1)
    assert sv.parse("1.2") == sv(1, 2)
    assert sv.parse("1.2b") == sv(1, 2, ex="b")
    assert sv.parse("1.2rc") == sv(1, 2, ex="rc")
    assert sv.parse("1.2b3") == sv(1, 2, ex="b", exnum=3)
    assert sv.parse("1.2.3") == sv(1, 2, 3)
    assert sv.parse("1.2.3a") == sv(1, 2, 3, "a")
    assert sv.parse("1.2.3rc") == sv(1, 2, 3, "rc")
    assert sv.parse("1.2.3a4") == sv(1, 2, 3, "a", 4)
    assert sv.parse("1.2.3rc2") == sv(1, 2, 3, "rc", 2)
    assert sv.parse("999.999.999c999") == sv(999, 999, 999, "c", 999)

    assert sv.parse("1.2") == sv.parse("1.2")
    assert sv("1.2") != sv("1.3")
    assert sv.parse("1.0") < sv.parse("1.1")
    assert sv.parse("1.0") < sv.parse("2.0")
    assert sv.parse("1.2.3a4") < sv.parse("1.2.3a5")
    assert sv.parse("1.2.3a5") > sv.parse("1.2.3a4")
    assert sv.parse("1.2.3c99") < sv.parse("1.2.4")
    assert sv.parse("1.2.3a4") != sv.parse("1.2.3a5")
    assert sv.parse("1.2.3a5") != sv.parse("1.2.3a4")
    assert sv.parse("1.2.3c99") != sv.parse("1.2.4")
    assert sv.parse("1.2.3a4") <= sv.parse("1.2.3a5")
    assert sv.parse("1.2.3a5") >= sv.parse("1.2.3a4")
    assert sv.parse("1.2.3c99") <= sv.parse("1.2.4")
    assert sv.parse("1.2") <= sv.parse("1.2")

    assert sv(1, 2, 3).to_int() == 17213488128
    assert sv.from_int(17213488128) == sv(1, 2, 3)