File: test_snapshot.py

package info (click to toggle)
celery 5.5.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 8,008 kB
  • sloc: python: 64,346; sh: 795; makefile: 378
file content (121 lines) | stat: -rw-r--r-- 3,359 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
from unittest.mock import Mock, patch

import pytest

from celery.app.events import Events
from celery.events.snapshot import Polaroid, evcam


class MockTimer:
    installed = []

    def call_repeatedly(self, secs, fun, *args, **kwargs):
        self.installed.append(fun)
        return Mock(name='TRef')


timer = MockTimer()


class test_Polaroid:

    def setup_method(self):
        self.state = self.app.events.State()

    def test_constructor(self):
        x = Polaroid(self.state, app=self.app)
        assert x.app is self.app
        assert x.state is self.state
        assert x.freq
        assert x.cleanup_freq
        assert x.logger
        assert not x.maxrate

    def test_install_timers(self):
        x = Polaroid(self.state, app=self.app)
        x.timer = timer
        x.__exit__()
        x.__enter__()
        assert x.capture in MockTimer.installed
        assert x.cleanup in MockTimer.installed
        x._tref.cancel.assert_not_called()
        x._ctref.cancel.assert_not_called()
        x.__exit__()
        x._tref.cancel.assert_called()
        x._ctref.cancel.assert_called()
        x._tref.assert_called()
        x._ctref.assert_not_called()

    def test_cleanup(self):
        x = Polaroid(self.state, app=self.app)
        cleanup_signal_sent = [False]

        def handler(**kwargs):
            cleanup_signal_sent[0] = True

        x.cleanup_signal.connect(handler)
        x.cleanup()
        assert cleanup_signal_sent[0]

    def test_shutter__capture(self):
        x = Polaroid(self.state, app=self.app)
        shutter_signal_sent = [False]

        def handler(**kwargs):
            shutter_signal_sent[0] = True

        x.shutter_signal.connect(handler)
        x.shutter()
        assert shutter_signal_sent[0]

        shutter_signal_sent[0] = False
        x.capture()
        assert shutter_signal_sent[0]

    def test_shutter_maxrate(self):
        x = Polaroid(self.state, app=self.app, maxrate='1/h')
        shutter_signal_sent = [0]

        def handler(**kwargs):
            shutter_signal_sent[0] += 1

        x.shutter_signal.connect(handler)
        for i in range(30):
            x.shutter()
            x.shutter()
            x.shutter()
        assert shutter_signal_sent[0] == 1


class test_evcam:

    class MockReceiver:
        raise_keyboard_interrupt = False

        def capture(self, **kwargs):
            if self.__class__.raise_keyboard_interrupt:
                raise KeyboardInterrupt()

    class MockEvents(Events):

        def Receiver(self, *args, **kwargs):
            return test_evcam.MockReceiver()

    def setup_method(self):
        self.app.events = self.MockEvents()
        self.app.events.app = self.app

    def test_evcam(self, restore_logging):
        evcam(Polaroid, timer=timer, app=self.app)
        evcam(Polaroid, timer=timer, loglevel='CRITICAL', app=self.app)
        self.MockReceiver.raise_keyboard_interrupt = True
        try:
            with pytest.raises(SystemExit):
                evcam(Polaroid, timer=timer, app=self.app)
        finally:
            self.MockReceiver.raise_keyboard_interrupt = False

    @patch('celery.platforms.create_pidlock')
    def test_evcam_pidfile(self, create_pidlock):
        evcam(Polaroid, timer=timer, pidfile='/var/pid', app=self.app)
        create_pidlock.assert_called_with('/var/pid')