File: test_functional.py

package info (click to toggle)
python-circuitbreaker 1.4.0-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 132 kB
  • sloc: python: 504; makefile: 3
file content (248 lines) | stat: -rw-r--r-- 7,707 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
from time import sleep

try:
    from unittest.mock import Mock, patch
except ImportError:
    from mock import Mock, patch

from pytest import raises

from circuitbreaker import CircuitBreaker, CircuitBreakerError, \
    CircuitBreakerMonitor, STATE_CLOSED, STATE_HALF_OPEN, STATE_OPEN


def pseudo_remote_call():
    return True


@CircuitBreaker()
def circuit_success():
    return pseudo_remote_call()


@CircuitBreaker(failure_threshold=1, name="circuit_failure")
def circuit_failure():
    raise IOError()


@CircuitBreaker(failure_threshold=1, name="circuit_generator_failure")
def circuit_generator_failure():
    pseudo_remote_call()
    yield 1
    raise IOError()


@CircuitBreaker(failure_threshold=1, name="threshold_1")
def circuit_threshold_1():
    return pseudo_remote_call()


@CircuitBreaker(failure_threshold=2, recovery_timeout=1, name="threshold_2")
def circuit_threshold_2_timeout_1():
    return pseudo_remote_call()


@CircuitBreaker(failure_threshold=3, recovery_timeout=1, name="threshold_3")
def circuit_threshold_3_timeout_1():
    return pseudo_remote_call()


def test_circuit_pass_through():
    assert circuit_success() is True


def test_circuitbreaker_monitor():
    assert CircuitBreakerMonitor.all_closed() is True
    assert len(list(CircuitBreakerMonitor.get_circuits())) == 6
    assert len(list(CircuitBreakerMonitor.get_closed())) == 6
    assert len(list(CircuitBreakerMonitor.get_open())) == 0

    with raises(IOError):
        circuit_failure()

    assert CircuitBreakerMonitor.all_closed() is False
    assert len(list(CircuitBreakerMonitor.get_circuits())) == 6
    assert len(list(CircuitBreakerMonitor.get_closed())) == 5
    assert len(list(CircuitBreakerMonitor.get_open())) == 1


@patch('test_functional.pseudo_remote_call', return_value=True)
def test_threshold_hit_prevents_consequent_calls(mock_remote):
    # type: (Mock) -> None
    mock_remote.side_effect = IOError('Connection refused')
    circuitbreaker = CircuitBreakerMonitor.get('threshold_1')

    assert circuitbreaker.closed

    with raises(IOError):
        circuit_threshold_1()

    assert circuitbreaker.opened

    with raises(CircuitBreakerError):
        circuit_threshold_1()

    mock_remote.assert_called_once_with()


@patch('test_functional.pseudo_remote_call', return_value=True)
def test_circuitbreaker_recover_half_open(mock_remote):
    # type: (Mock) -> None
    circuitbreaker = CircuitBreakerMonitor.get('threshold_3')

    # initial state: closed
    assert circuitbreaker.closed
    assert circuitbreaker.state == STATE_CLOSED

    # no exception -> success
    assert circuit_threshold_3_timeout_1()

    # from now all subsequent calls will fail
    mock_remote.side_effect = IOError('Connection refused')

    # 1. failed call -> original exception
    with raises(IOError):
        circuit_threshold_3_timeout_1()
    assert circuitbreaker.closed
    assert circuitbreaker.failure_count == 1

    # 2. failed call -> original exception
    with raises(IOError):
        circuit_threshold_3_timeout_1()
    assert circuitbreaker.closed
    assert circuitbreaker.failure_count == 2

    # 3. failed call -> original exception
    with raises(IOError):
        circuit_threshold_3_timeout_1()

    # Circuit breaker opens, threshold has been reached
    assert circuitbreaker.opened
    assert circuitbreaker.state == STATE_OPEN
    assert circuitbreaker.failure_count == 3
    assert 0 < circuitbreaker.open_remaining <= 1

    # 4. failed call -> not passed to function -> CircuitBreakerError
    with raises(CircuitBreakerError):
        circuit_threshold_3_timeout_1()
    assert circuitbreaker.opened
    assert circuitbreaker.failure_count == 3
    assert 0 < circuitbreaker.open_remaining <= 1

    # 5. failed call -> not passed to function -> CircuitBreakerError
    with raises(CircuitBreakerError):
        circuit_threshold_3_timeout_1()
    assert circuitbreaker.opened
    assert circuitbreaker.failure_count == 3
    assert 0 < circuitbreaker.open_remaining <= 1

    # wait for 1 second (recover timeout)
    sleep(1)

    # circuit half-open -> next call will be passed through
    assert not circuitbreaker.closed
    assert circuitbreaker.open_remaining < 0
    assert circuitbreaker.state == STATE_HALF_OPEN

    # State half-open -> function is executed -> original exception
    with raises(IOError):
        circuit_threshold_3_timeout_1()
    assert circuitbreaker.opened
    assert circuitbreaker.failure_count == 4
    assert 0 < circuitbreaker.open_remaining <= 1

    # State open > not passed to function -> CircuitBreakerError
    with raises(CircuitBreakerError):
        circuit_threshold_3_timeout_1()


@patch('test_functional.pseudo_remote_call', return_value=True)
def test_circuitbreaker_reopens_after_successful_calls(mock_remote):
    # type: (Mock) -> None
    circuitbreaker = CircuitBreakerMonitor.get('threshold_2')

    assert str(circuitbreaker) == 'threshold_2'

    # initial state: closed
    assert circuitbreaker.closed
    assert circuitbreaker.state == STATE_CLOSED
    assert circuitbreaker.failure_count == 0

    # successful call -> no exception
    assert circuit_threshold_2_timeout_1()

    # from now all subsequent calls will fail
    mock_remote.side_effect = IOError('Connection refused')

    # 1. failed call -> original exception
    with raises(IOError):
        circuit_threshold_2_timeout_1()
    assert circuitbreaker.closed
    assert circuitbreaker.failure_count == 1

    # 2. failed call -> original exception
    with raises(IOError):
        circuit_threshold_2_timeout_1()

    # Circuit breaker opens, threshold has been reached
    assert circuitbreaker.opened
    assert circuitbreaker.state == STATE_OPEN
    assert circuitbreaker.failure_count == 2
    assert 0 < circuitbreaker.open_remaining <= 1

    # 4. failed call -> not passed to function -> CircuitBreakerError
    with raises(CircuitBreakerError):
        circuit_threshold_2_timeout_1()
    assert circuitbreaker.opened
    assert circuitbreaker.failure_count == 2
    assert 0 < circuitbreaker.open_remaining <= 1

    # from now all subsequent calls will succeed
    mock_remote.side_effect = None

    # but recover timeout has not been reached -> still open
    # 5. failed call -> not passed to function -> CircuitBreakerError
    with raises(CircuitBreakerError):
        circuit_threshold_2_timeout_1()
    assert circuitbreaker.opened
    assert circuitbreaker.failure_count == 2
    assert 0 < circuitbreaker.open_remaining <= 1

    # wait for 1 second (recover timeout)
    sleep(1)

    # circuit half-open -> next call will be passed through
    assert not circuitbreaker.closed
    assert circuitbreaker.failure_count == 2
    assert circuitbreaker.open_remaining < 0
    assert circuitbreaker.state == STATE_HALF_OPEN

    # successful call
    assert circuit_threshold_2_timeout_1()

    # circuit closed and reset'ed
    assert circuitbreaker.closed
    assert circuitbreaker.state == STATE_CLOSED
    assert circuitbreaker.failure_count == 0

    # some another successful calls
    assert circuit_threshold_2_timeout_1()
    assert circuit_threshold_2_timeout_1()
    assert circuit_threshold_2_timeout_1()


@patch("test_functional.pseudo_remote_call", return_value=True)
def test_circuitbreaker_handles_generator_functions(mock_remote):
    # type: (Mock) -> None
    circuitbreaker = CircuitBreakerMonitor.get("circuit_generator_failure")
    assert circuitbreaker.closed

    with raises(IOError):
        list(circuit_generator_failure())

    assert circuitbreaker.opened

    with raises(CircuitBreakerError):
        list(circuit_generator_failure())

    mock_remote.assert_called_once_with()