File: test_timer2.py

package info (click to toggle)
celery 5.5.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 8,008 kB
  • sloc: python: 64,346; sh: 795; makefile: 378
file content (105 lines) | stat: -rw-r--r-- 3,215 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import sys
import time
from unittest.mock import Mock, call, patch

from celery.utils import timer2 as timer2


class test_Timer:

    def test_enter_after(self):
        t = timer2.Timer()
        try:
            done = [False]

            def set_done():
                done[0] = True

            t.call_after(0.3, set_done)
            mss = 0
            while not done[0]:
                if mss >= 2.0:
                    raise Exception('test timed out')
                time.sleep(0.1)
                mss += 0.1
        finally:
            t.stop()

    def test_exit_after(self):
        t = timer2.Timer()
        t.call_after = Mock()
        t.exit_after(0.3, priority=10)
        t.call_after.assert_called_with(0.3, sys.exit, 10)

    def test_ensure_started_not_started(self):
        t = timer2.Timer()
        t.running = True
        t.start = Mock()
        t.ensure_started()
        t.start.assert_not_called()
        t.running = False
        t.on_start = Mock()
        t.ensure_started()
        t.on_start.assert_called_with(t)
        t.start.assert_called_with()

    @patch('celery.utils.timer2.sleep')
    @patch('os._exit')  # To ensure the test fails gracefully
    def test_on_tick(self, _exit, sleep):
        def next_entry_side_effect():
            # side effect simulating following scenario:
            # 3.33, 3.33, 3.33, <shutdown event set>
            for _ in range(3):
                yield 3.33
            while True:
                yield getattr(t, "_Timer__is_shutdown").set()

        on_tick = Mock(name='on_tick')
        t = timer2.Timer(on_tick=on_tick)
        t._next_entry = Mock(
            name='_next_entry', side_effect=next_entry_side_effect()
        )
        t.run()
        sleep.assert_called_with(3.33)
        on_tick.assert_has_calls([call(3.33), call(3.33), call(3.33)])
        _exit.assert_not_called()

    @patch('os._exit')
    def test_thread_crash(self, _exit):
        t = timer2.Timer()
        t._next_entry = Mock()
        t._next_entry.side_effect = OSError(131)
        t.run()
        _exit.assert_called_with(1)

    def test_gc_race_lost(self):
        t = timer2.Timer()
        with patch.object(t, "_Timer__is_stopped") as mock_stop_event:
            # Mark the timer as shutting down so we escape the run loop,
            # mocking the running state so we don't block!
            with patch.object(t, "running", new=False):
                t.stop()
            # Pretend like the interpreter has shutdown and GCed built-in
            # modules, causing an exception
            mock_stop_event.set.side_effect = TypeError()
            t.run()
        mock_stop_event.set.assert_called_with()

    def test_test_enter(self):
        t = timer2.Timer()
        t._do_enter = Mock()
        e = Mock()
        t.enter(e, 13, 0)
        t._do_enter.assert_called_with('enter_at', e, 13, priority=0)

    def test_test_enter_after(self):
        t = timer2.Timer()
        t._do_enter = Mock()
        t.enter_after()
        t._do_enter.assert_called_with('enter_after')

    def test_cancel(self):
        t = timer2.Timer()
        tref = Mock()
        t.cancel(tref)
        tref.cancel.assert_called_with()