File: test_transaction.py

package info (click to toggle)
python-django-pgtransaction 2.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 504 kB
  • sloc: python: 551; makefile: 101; sh: 9
file content (392 lines) | stat: -rw-r--r-- 11,396 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
import threading
import time

import ddf
import pytest
from django.db import transaction
from django.db.utils import InternalError, OperationalError

import pgtransaction
from pgtransaction.tests.models import Trade
from pgtransaction.transaction import atomic

try:
    import psycopg.errors as psycopg_errors
except ImportError:
    import psycopg2.errors as psycopg_errors


@pytest.mark.django_db()
def test_atomic_read_committed():
    with atomic(isolation_level=pgtransaction.READ_COMMITTED):
        ddf.G(Trade)
    assert 1 == Trade.objects.count()


@pytest.mark.django_db()
def test_atomic_repeatable_read():
    with atomic(isolation_level=pgtransaction.REPEATABLE_READ):
        ddf.G(Trade)
    assert 1 == Trade.objects.count()


@pytest.mark.django_db(transaction=True)
def test_atomic_repeatable_read_with_select():
    ddf.G(Trade, price=1)
    with atomic(isolation_level="REPEATABLE READ"):
        trade = Trade.objects.last()
        trade.price = 2
        trade.save()
    assert 1 == Trade.objects.count()


@pytest.mark.django_db()
def test_atomic_serializable():
    with atomic(isolation_level=pgtransaction.SERIALIZABLE):
        ddf.G(Trade)
    assert 1 == Trade.objects.count()


@pytest.mark.django_db()
def test_atomic_decorator():
    @atomic(isolation_level="REPEATABLE READ")
    def f():
        ddf.G(Trade)

    f()
    assert 1 == Trade.objects.count()


@pytest.mark.django_db(transaction=True)
def test_atomic_decorator_with_args():
    @atomic(isolation_level="REPEATABLE READ")
    def f(trade_id):
        trade = Trade.objects.get(id=trade_id)
        trade.price = 2
        trade.save()

    trade = ddf.G(Trade, price=1)
    f(trade.pk)
    assert 1 == Trade.objects.count()


@pytest.mark.django_db(transaction=True)
def test_atomic_nested_isolation_levels():
    # This is permitted because no statements have been issued
    with transaction.atomic():
        with atomic(isolation_level="SERIALIZABLE"):
            pass

    # You can't change the isolation levels after issuing
    # a statement
    with pytest.raises(InternalError):
        with atomic(isolation_level="REPEATABLE READ"):
            ddf.G(Trade)
            with atomic(isolation_level="SERIALIZABLE"):
                pass

    # This is permitted because the isolation levels remain the same
    with atomic(isolation_level="REPEATABLE READ"):
        ddf.G(Trade)
        with atomic(isolation_level="REPEATABLE READ"):
            pass

    # Final sanity check
    with pytest.raises(InternalError):
        with atomic(isolation_level="REPEATABLE READ"):
            ddf.G(Trade)
            with atomic(isolation_level="REPEATABLE READ"):
                with atomic(isolation_level="SERIALIZABLE"):
                    pass


@pytest.mark.django_db(transaction=True)
def test_atomic_nested_read_modes():
    # This is permitted because no statements have been issued
    with transaction.atomic():
        with atomic(read_mode=pgtransaction.READ_ONLY):
            pass

    # You can nest READ_ONLY inside READ_WRITE after issuing statements
    with atomic(read_mode=pgtransaction.READ_WRITE):
        ddf.G(Trade)
        with atomic(read_mode=pgtransaction.READ_ONLY):
            # Can read in nested read-only transaction
            Trade.objects.count()

    # This is permitted - same read modes
    with atomic(read_mode=pgtransaction.READ_WRITE):
        ddf.G(Trade)
        with atomic(read_mode=pgtransaction.READ_WRITE):
            ddf.G(Trade)

    # You cannot set READ WRITE mode after any query has been issued
    with pytest.raises(InternalError):
        with atomic(read_mode=pgtransaction.READ_ONLY):
            Trade.objects.count()
            with atomic(read_mode=pgtransaction.READ_WRITE):
                pass


@pytest.mark.django_db()
def test_atomic_with_nested_atomic():
    with atomic(isolation_level="REPEATABLE READ"):
        ddf.G(Trade)
        with atomic():
            ddf.G(Trade)
    assert 2 == Trade.objects.count()


@pytest.mark.django_db()
def test_atomic_rollback():
    with pytest.raises(Exception, match="Exception thrown"):
        with atomic(isolation_level="REPEATABLE READ"):
            ddf.G(Trade)
            raise Exception("Exception thrown")

    assert not Trade.objects.exists()


@pytest.mark.django_db()
def test_pg_atomic_nested_atomic_rollback():
    with atomic(isolation_level="REPEATABLE READ"):
        ddf.G(Trade)
        try:
            with atomic():
                ddf.G(Trade)
                raise RuntimeError
        except RuntimeError:
            pass
    assert 1 == Trade.objects.count()


@pytest.mark.django_db(transaction=True)
def test_atomic_retries_context_manager_not_allowed():
    with pytest.raises(RuntimeError, match="as a context manager"):
        with atomic(isolation_level="REPEATABLE READ", retry=1):
            pass


@pytest.mark.django_db()
def test_atomic_nested_retries_not_permitted():
    with pytest.raises(RuntimeError, match="Retries are not permitted"):
        with transaction.atomic():
            with atomic(isolation_level="REPEATABLE READ", retry=1):
                pass

    @atomic(isolation_level="REPEATABLE READ", retry=1)
    def decorated():
        pass

    with pytest.raises(RuntimeError, match="Retries are not permitted"):
        with transaction.atomic():
            decorated()


@pytest.mark.django_db(transaction=True)
def test_atomic_retries_all_retries_fail():
    assert not Trade.objects.exists()
    attempts = []

    @atomic(isolation_level="REPEATABLE READ", retry=2)
    def func(retries):
        attempts.append(True)
        ddf.G(Trade)
        raise OperationalError from psycopg_errors.SerializationFailure

    with pytest.raises(OperationalError):
        func(attempts)

    assert not Trade.objects.exists()
    assert len(attempts) == 3

    # Ensure the decorator tries again
    with pytest.raises(OperationalError):
        func(attempts)

    assert not Trade.objects.exists()
    assert len(attempts) == 6


@pytest.mark.django_db(transaction=True)
def test_atomic_retries_decorator_first_retry_passes():
    assert not Trade.objects.exists()
    attempts = []

    @atomic(isolation_level="REPEATABLE READ", retry=1)
    def func(attempts):
        attempts.append(True)
        ddf.G(Trade)
        if len(attempts) == 1:
            raise OperationalError from psycopg_errors.SerializationFailure

    func(attempts)
    assert 1 == Trade.objects.all().count()
    assert len(attempts) == 2


@pytest.mark.django_db(transaction=True)
def test_pg_atomic_retries_with_nested_atomic_failure():
    assert not Trade.objects.exists()
    attempts = []

    @atomic(isolation_level="REPEATABLE READ", retry=2)
    def outer(attempts):
        ddf.G(Trade)

        @atomic
        def inner(attempts):
            attempts.append(True)
            ddf.G(Trade)
            raise psycopg_errors.SerializationFailure

        try:
            inner(attempts)
        except psycopg_errors.SerializationFailure:
            pass

    outer(attempts)
    assert 1 == Trade.objects.all().count()
    assert len(attempts) == 1


@pytest.mark.django_db(transaction=True)
def test_atomic_retries_with_run_time_failure():
    assert not Trade.objects.exists()
    attempts = []

    @atomic(isolation_level="REPEATABLE READ", retry=2)
    def outer(attempts):
        attempts.append(True)
        ddf.G(Trade)
        raise RuntimeError

    with pytest.raises(RuntimeError):
        outer(attempts)

    assert not Trade.objects.all().exists()
    assert len(attempts) == 1


@pytest.mark.django_db(transaction=True)
def test_atomic_retries_with_nested_atomic_and_outer_retry():
    assert not Trade.objects.exists()
    attempts = []

    @atomic(isolation_level="REPEATABLE READ", retry=1)
    def outer(attempts):
        ddf.G(Trade)

        @atomic
        def inner(attempts):
            attempts.append(True)
            ddf.G(Trade)

        inner(attempts)

        if len(attempts) == 1:
            raise OperationalError from psycopg_errors.SerializationFailure

    outer(attempts)
    assert 2 == Trade.objects.all().count()
    assert len(attempts) == 2


@pytest.mark.django_db(transaction=True)
def test_concurrent_serialization_error():
    """
    Simulate a concurrency issue that will throw a serialization error.
    Ensure that a retry is successful
    """

    def concurrent_update(barrier, trade, calls):
        # We have to instantiate the decorator inside the function, otherwise
        # it is shared among threads and causes the test to hang. It's uncertain
        # what causes it to hang.
        @pgtransaction.atomic(isolation_level="SERIALIZABLE", retry=3)
        def inner_update(trade, calls):
            calls.append(True)
            trade = Trade.objects.get(id=trade.id)
            trade.price = 2
            trade.save()
            time.sleep(1)

        barrier.wait()
        inner_update(trade, calls)

    barrier = threading.Barrier(2)
    trade = ddf.G(Trade, price=1)
    calls = []
    t1 = threading.Thread(target=concurrent_update, args=[barrier, trade, calls])
    t2 = threading.Thread(target=concurrent_update, args=[barrier, trade, calls])

    t1.start()
    t2.start()

    t1.join()
    t2.join()

    # We should have at least had three attempts. It's highly unlikely we would have four,
    # but the possibility exists.
    assert 3 <= len(calls) <= 4


@pytest.mark.django_db(transaction=True)
def test_atomic_read_only():
    """Test that a read only transaction cannot write."""
    with atomic(read_mode=pgtransaction.READ_ONLY):
        trade = ddf.N(Trade)
        # Should not be able to write.
        with pytest.raises(InternalError):
            if trade is not None:  # pragma: no branch - we always hit this branch
                trade.price = 2
                trade.save()


@pytest.mark.django_db(transaction=True)
def test_atomic_deferrable_validation():
    """Test validation of deferrable mode."""

    # Should raise error if not used with SERIALIZABLE and READ ONLY
    with pytest.raises(ValueError, match="DEFFERABLE transactions have no effect"):
        with atomic(deferrable=pgtransaction.DEFERRABLE):  # type: ignore - also yields a type error.
            pass

    # Allowed with SERIALIZABLE and READ ONLY
    with atomic(
        isolation_level=pgtransaction.SERIALIZABLE,
        read_mode=pgtransaction.READ_ONLY,
        deferrable=pgtransaction.DEFERRABLE,
    ):
        pass


@pytest.mark.django_db(transaction=True)
def test_deferrable_read_only_behavior():
    """Test behavior of deferrable read only transactions."""
    import threading

    trade = ddf.G(Trade, company="Company 1")

    def modify_data() -> None:
        with atomic():
            trade_obj = Trade.objects.get(id=trade.id)
            trade_obj.company = "Company 2"
            trade_obj.save()

    thread = threading.Thread(target=modify_data)

    with atomic(
        isolation_level=pgtransaction.SERIALIZABLE,
        read_mode=pgtransaction.READ_ONLY,
        deferrable=pgtransaction.DEFERRABLE,
    ):
        initial_read = Trade.objects.get(id=trade.id)
        assert initial_read.company == "Company 1"

        thread.start()
        thread.join()

        # Data should stay the same.
        final_read = Trade.objects.get(id=trade.id)
        assert final_read.company == "Company 1"