File: test_threads.py

package info (click to toggle)
celery 5.5.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 8,008 kB
  • sloc: python: 64,346; sh: 795; makefile: 378
file content (103 lines) | stat: -rw-r--r-- 2,427 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
from unittest.mock import patch

import pytest

from celery.utils.threads import Local, LocalManager, _FastLocalStack, _LocalStack, bgThread
from t.unit import conftest


class test_bgThread:

    def test_crash(self):

        class T(bgThread):

            def body(self):
                raise KeyError()

        with patch('os._exit') as _exit:
            with conftest.stdouts():
                _exit.side_effect = ValueError()
                t = T()
                with pytest.raises(ValueError):
                    t.run()
                _exit.assert_called_with(1)

    def test_interface(self):
        x = bgThread()
        with pytest.raises(NotImplementedError):
            x.body()


class test_Local:

    def test_iter(self):
        x = Local()
        x.foo = 'bar'
        ident = x.__ident_func__()
        assert (ident, {'foo': 'bar'}) in list(iter(x))

        delattr(x, 'foo')
        assert (ident, {'foo': 'bar'}) not in list(iter(x))
        with pytest.raises(AttributeError):
            delattr(x, 'foo')

        assert x(lambda: 'foo') is not None


class test_LocalStack:

    def test_stack(self):
        x = _LocalStack()
        assert x.pop() is None
        x.__release_local__()
        ident = x.__ident_func__
        x.__ident_func__ = ident

        with pytest.raises(RuntimeError):
            x()[0]

        x.push(['foo'])
        assert x()[0] == 'foo'
        x.pop()
        with pytest.raises(RuntimeError):
            x()[0]


class test_FastLocalStack:

    def test_stack(self):
        x = _FastLocalStack()
        x.push(['foo'])
        x.push(['bar'])
        assert x.top == ['bar']
        assert len(x) == 2
        x.pop()
        assert x.top == ['foo']
        x.pop()
        assert x.top is None


class test_LocalManager:

    def test_init(self):
        x = LocalManager()
        assert x.locals == []
        assert x.ident_func

        def ident():
            return 1
        loc = Local()
        x = LocalManager([loc], ident_func=ident)
        assert x.locals == [loc]
        x = LocalManager(loc, ident_func=ident)
        assert x.locals == [loc]
        assert x.ident_func is ident
        assert x.locals[0].__ident_func__ is ident
        assert x.get_ident() == 1

        with patch('celery.utils.threads.release_local') as release:
            x.cleanup()
            release.assert_called_with(loc)

        assert repr(x)