File: test_database.py

package info (click to toggle)
python-hypothesis 6.138.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 15,272 kB
  • sloc: python: 62,853; ruby: 1,107; sh: 253; makefile: 41; javascript: 6
file content (227 lines) | stat: -rw-r--r-- 7,306 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# This file is part of Hypothesis, which may be found at
# https://github.com/HypothesisWorks/hypothesis/
#
# Copyright the Hypothesis Authors.
# Individual contributors are listed in AUTHORS.rst and the git log.
#
# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file, You can
# obtain one at https://mozilla.org/MPL/2.0/.

import math
import sys
import time
from collections import Counter

import pytest

from hypothesis import Phase, settings
from hypothesis.database import (
    DirectoryBasedExampleDatabase,
    InMemoryExampleDatabase,
    MultiplexedDatabase,
)
from hypothesis.internal.reflection import get_pretty_function_description

from tests.common.utils import flaky, skipif_threading
from tests.cover.test_database_backend import _database_conforms_to_listener_api

# we need real time here, not monkeypatched for CI
time_sleep = time.sleep


def test_database_listener_directory():
    # this test is very expensive because we wait between every rule for the
    # filesystem observer to fire. Limit examples/step count as much as possible.
    _database_conforms_to_listener_api(
        lambda path: DirectoryBasedExampleDatabase(path),
        flush=lambda _db: time_sleep(0.2),
        supports_value_delete=False,
        # expensive flush makes shrinking take forever
        parent_settings=settings(
            max_examples=5, stateful_step_count=10, phases=set(Phase) - {Phase.shrink}
        ),
    )


# seen flaky on test-win; we get *three* of the same save events in the first
# assertion, which...is baffling, and possibly a genuine bug (most likely in
# watchdog).
@flaky(max_runs=5, min_passes=1)
@skipif_threading  # add_listener is not thread safe because watchdog is not
def test_database_listener_multiplexed(tmp_path):
    db = MultiplexedDatabase(
        InMemoryExampleDatabase(), DirectoryBasedExampleDatabase(tmp_path)
    )
    events = []

    def listener(event):
        events.append(event)

    db.add_listener(listener)

    db.save(b"a", b"a")
    time_sleep(0.2)
    assert events == [("save", (b"a", b"a"))] * 2

    db.remove_listener(listener)
    db.delete(b"a", b"a")
    db.save(b"a", b"b")
    time_sleep(0.2)
    assert events == [("save", (b"a", b"a"))] * 2

    db.add_listener(listener)
    db.delete(b"a", b"b")
    db.save(b"a", b"c")
    time_sleep(0.2)
    # InMemory database fires immediately, while DirectoryBased has to
    # wait for filesystem listeners. Therefore the events can arrive out of
    # order. Test a weaker multiset property, disregarding ordering.
    assert Counter(events[2:]) == {
        # InMemory
        ("delete", (b"a", b"b")): 1,
        # DirectoryBased
        ("delete", (b"a", None)): 1,
        # both
        ("save", (b"a", b"c")): 2,
    }


def wait_for(condition, *, timeout=1, interval=0.01):
    for _ in range(math.ceil(timeout / interval)):
        if condition():
            return
        time_sleep(interval)
    raise Exception(
        f"timing out after waiting {timeout}s for condition "
        f"{get_pretty_function_description(condition)}"
    )


# seen flaky on check-coverage (timeout in first wait_for)
@flaky(max_runs=5, min_passes=1)
@skipif_threading  # add_listener is not thread safe because watchdog is not
def test_database_listener_directory_explicit(tmp_path):
    db = DirectoryBasedExampleDatabase(tmp_path)
    events = []

    def listener(event):
        events.append(event)

    db.add_listener(listener)

    db.save(b"k1", b"v1")
    wait_for(lambda: events == [("save", (b"k1", b"v1"))], timeout=5)

    db.remove_listener(listener)
    db.delete(b"k1", b"v1")
    db.save(b"k1", b"v2")
    time_sleep(0.2)
    assert events == [("save", (b"k1", b"v1"))]

    db.add_listener(listener)
    db.delete(b"k1", b"v2")
    db.save(b"k1", b"v3")
    wait_for(
        lambda: events[1:]
        == [
            ("delete", (b"k1", None)),
            ("save", (b"k1", b"v3")),
        ],
        timeout=5,
    )

    # moving into a nonexistent key
    db.move(b"k1", b"k2", b"v3")
    time_sleep(0.5)
    # moving back into an existing key
    db.move(b"k2", b"k1", b"v3")
    time_sleep(0.5)

    if sys.platform.startswith("darwin"):
        assert events[3:] == [
            ("delete", (b"k1", b"v3")),
            ("save", (b"k2", b"v3")),
            ("delete", (b"k2", b"v3")),
            ("save", (b"k1", b"v3")),
        ], str(events[3:])
    elif sys.platform.startswith("win"):
        # watchdog fires save/delete events instead of move events on windows.
        # This means we don't broadcast the exact deleted value.
        assert events[3:] == [
            ("delete", (b"k1", None)),
            ("save", (b"k2", b"v3")),
            ("delete", (b"k2", None)),
            ("save", (b"k1", b"v3")),
        ], str(events[3:])
    elif sys.platform.startswith("linux"):
        # move #1
        assert ("save", (b"k2", b"v3")) in events
        # sometimes watchdog fires a move event (= save + delete with value),
        # and other times it fires separate save and delete events (= delete with
        # no value). I think this is due to particulars of what happens when
        # a new directory gets created very close to the time when a file is
        # saved to that directory.
        assert any(("delete", (b"k1", val)) in events for val in [b"v3", None])

        # move #2
        assert ("save", (b"k1", b"v3")) in events
        assert any(("delete", (b"k2", val)) in events for val in [b"v3", None])
    else:
        raise NotImplementedError(f"unknown platform {sys.platform}")


# seen flaky on windows CI (timeout in wait_for).
# when this happens it seems to occur consistently within that run, so the
# @flaky doesn't help.
@pytest.mark.skipif(sys.platform.startswith("win"), reason="too flaky on windows")
@flaky(max_runs=5, min_passes=1)
@skipif_threading  # add_listener is not thread safe because watchdog is not
def test_database_listener_directory_move(tmp_path):
    db = DirectoryBasedExampleDatabase(tmp_path)
    events = []

    def listener(event):
        events.append(event)

    # make sure both keys exist and that v1 exists in k1 and not k2
    db.save(b"k1", b"v1")
    db.save(b"k2", b"v_unrelated")

    time_sleep(0.1)
    db.add_listener(listener)
    time_sleep(0.1)

    db.move(b"k1", b"k2", b"v1")
    # events might arrive in either order
    wait_for(
        lambda: set(events)
        == {
            ("save", (b"k2", b"v1")),
            # windows doesn't fire move events, so value is None
            ("delete", (b"k1", None if sys.platform.startswith("win") else b"v1")),
        },
        timeout=5,
    )


@skipif_threading  # add_listener is not thread safe because watchdog is not
def test_still_listens_if_directory_did_not_exist(tmp_path):
    # if we start listening on a nonexistent path, we will create that path and
    # still listen for events
    events = []

    def listener(event):
        events.append(event)

    p = tmp_path / "does_not_exist_yet"
    db = DirectoryBasedExampleDatabase(p)
    assert not p.exists()

    db.add_listener(listener)
    assert p.exists()

    assert not events
    db.save(b"k1", b"v1")
    time_sleep(0.2)
    assert len(events) == 1